未验证 提交 e9e1acb3 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #11026 from taosdata/feature/tdb-merge

Feature/tdb merge
......@@ -8,7 +8,6 @@ target_sources(tdb
"src/db/tdbBtree.c"
"src/db/tdbDb.c"
"src/db/tdbEnv.c"
# "src/db/tdbPage.c"
"src/page/tdbPage.c"
"src/page/tdbPageL.c"
)
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TDB_BTREE_INT_H_
#define _TDB_BTREE_INT_H_
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif /*_TDB_BTREE_INT_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
......@@ -39,14 +39,24 @@ struct SBTree {
u8 *pTmp;
};
#define TDB_BTREE_PAGE_COMMON_HDR u8 flags;
#define TDB_BTREE_PAGE_GET_FLAGS(PAGE) (PAGE)->pData[0]
#define TDB_BTREE_PAGE_SET_FLAGS(PAGE, flags) ((PAGE)->pData[0] = (flags))
typedef struct __attribute__((__packed__)) {
SPgno rChild;
} SBtPageHdr;
TDB_BTREE_PAGE_COMMON_HDR
} SLeafHdr;
typedef struct __attribute__((__packed__)) {
TDB_BTREE_PAGE_COMMON_HDR;
SPgno pgno; // right-most child
} SIntHdr;
typedef struct {
u16 flags;
u8 flags;
SBTree *pBt;
} SBtreeZeroPageArg;
} SBtreeInitPageArg;
typedef struct {
int kLen;
......@@ -57,7 +67,7 @@ typedef struct {
u8 *pTmpSpace;
} SCellDecoder;
static int tdbBtCursorMoveTo(SBtCursor *pCur, const void *pKey, int kLen, int *pCRst);
static int tdbBtCursorMoveTo(SBTC *pCur, const void *pKey, int kLen, int *pCRst);
static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2);
static int tdbBtreeOpenImpl(SBTree *pBt);
static int tdbBtreeZeroPage(SPage *pPage, void *arg);
......@@ -65,7 +75,11 @@ static int tdbBtreeInitPage(SPage *pPage, void *arg);
static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const void *pVal, int vLen, SCell *pCell,
int *szCell);
static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder);
static int tdbBtreeBalance(SBtCursor *pCur);
static int tdbBtreeBalance(SBTC *pCur);
static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell);
static int tdbBtcMoveToNext(SBTC *pBtc);
static int tdbBtcMoveDownward(SBTC *pCur, SPgno pgno);
static int tdbBtcMoveUpward(SBTC *pBtc);
int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, FKeyComparator kcmpr, SBTree **ppBt) {
SBTree *pBt;
......@@ -73,7 +87,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, FKeyComparator kcmpr, S
*ppBt = NULL;
pBt = (SBTree *)taosMemoryCalloc(1, sizeof(*pBt));
pBt = (SBTree *)calloc(1, sizeof(*pBt));
if (pBt == NULL) {
return -1;
}
......@@ -107,7 +121,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, FKeyComparator kcmpr, S
// TODO: pBt->root
ret = tdbBtreeOpenImpl(pBt);
if (ret < 0) {
taosMemoryFree(pBt);
free(pBt);
return -1;
}
......@@ -120,16 +134,7 @@ int tdbBtreeClose(SBTree *pBt) {
return 0;
}
int tdbBtreeCursor(SBtCursor *pCur, SBTree *pBt) {
pCur->pBt = pBt;
pCur->iPage = -1;
pCur->pPage = NULL;
pCur->idx = -1;
return 0;
}
int tdbBtCursorInsert(SBtCursor *pCur, const void *pKey, int kLen, const void *pVal, int vLen) {
int tdbBtCursorInsert(SBTC *pCur, const void *pKey, int kLen, const void *pVal, int vLen) {
int ret;
int idx;
SPager *pPager;
......@@ -145,7 +150,7 @@ int tdbBtCursorInsert(SBtCursor *pCur, const void *pKey, int kLen, const void *p
}
if (pCur->idx == -1) {
ASSERT(TDB_PAGE_NCELLS(pCur->pPage) == 0);
ASSERT(TDB_PAGE_TOTAL_CELLS(pCur->pPage) == 0);
idx = 0;
} else {
if (cret > 0) {
......@@ -161,7 +166,7 @@ int tdbBtCursorInsert(SBtCursor *pCur, const void *pKey, int kLen, const void *p
// TODO: refact code here
pBt = pCur->pBt;
if (!pBt->pTmp) {
pBt->pTmp = (u8 *)taosMemoryMalloc(pBt->pageSize);
pBt->pTmp = (u8 *)malloc(pBt->pageSize);
if (pBt->pTmp == NULL) {
return -1;
}
......@@ -176,7 +181,7 @@ int tdbBtCursorInsert(SBtCursor *pCur, const void *pKey, int kLen, const void *p
}
// Insert the cell to the index
ret = tdbPageInsertCell(pCur->pPage, idx, pCell, szCell);
ret = tdbPageInsertCell(pCur->pPage, idx, pCell, szCell, 0);
if (ret < 0) {
return -1;
}
......@@ -192,12 +197,36 @@ int tdbBtCursorInsert(SBtCursor *pCur, const void *pKey, int kLen, const void *p
return 0;
}
static int tdbBtCursorMoveToChild(SBtCursor *pCur, SPgno pgno) {
// TODO
int tdbBtreeGet(SBTree *pBt, const void *pKey, int kLen, void **ppVal, int *vLen) {
SBTC btc;
SCell *pCell;
int cret;
void *pVal;
SCellDecoder cd;
tdbBtcOpen(&btc, pBt);
tdbBtCursorMoveTo(&btc, pKey, kLen, &cret);
if (cret) {
return cret;
}
pCell = tdbPageGetCell(btc.pPage, btc.idx);
tdbBtreeDecodeCell(btc.pPage, pCell, &cd);
*vLen = cd.vLen;
pVal = TDB_REALLOC(*ppVal, *vLen);
if (pVal == NULL) {
return -1;
}
*ppVal = pVal;
memcpy(*ppVal, cd.pVal, cd.vLen);
return 0;
}
static int tdbBtCursorMoveTo(SBtCursor *pCur, const void *pKey, int kLen, int *pCRst) {
static int tdbBtCursorMoveTo(SBTC *pCur, const void *pKey, int kLen, int *pCRst) {
int ret;
SBTree *pBt;
SPager *pPager;
......@@ -218,9 +247,9 @@ static int tdbBtCursorMoveTo(SBtCursor *pCur, const void *pKey, int kLen, int *p
pCur->iPage = 0;
if (TDB_PAGE_NCELLS(pCur->pPage) == 0) {
if (TDB_PAGE_TOTAL_CELLS(pCur->pPage) == 0) {
// Current page is empty
ASSERT(TDB_FLAG_IS(TDB_PAGE_FLAGS(pCur->pPage), TDB_BTREE_ROOT | TDB_BTREE_LEAF));
// ASSERT(TDB_FLAG_IS(TDB_PAGE_FLAGS(pCur->pPage), TDB_BTREE_ROOT | TDB_BTREE_LEAF));
return 0;
}
......@@ -231,7 +260,7 @@ static int tdbBtCursorMoveTo(SBtCursor *pCur, const void *pKey, int kLen, int *p
SCellDecoder cd = {0};
pPage = pCur->pPage;
nCells = TDB_PAGE_NCELLS(pPage);
nCells = TDB_PAGE_TOTAL_CELLS(pPage);
lidx = 0;
ridx = nCells - 1;
......@@ -242,7 +271,7 @@ static int tdbBtCursorMoveTo(SBtCursor *pCur, const void *pKey, int kLen, int *p
midx = (lidx + ridx) >> 1;
pCell = TDB_PAGE_CELL_AT(pPage, midx);
pCell = tdbPageGetCell(pPage, midx);
ret = tdbBtreeDecodeCell(pPage, pCell, &cd);
if (ret < 0) {
// TODO: handle error
......@@ -265,8 +294,8 @@ static int tdbBtCursorMoveTo(SBtCursor *pCur, const void *pKey, int kLen, int *p
}
// Move downward or break
u16 flags = TDB_PAGE_FLAGS(pPage);
u8 leaf = TDB_BTREE_PAGE_IS_LEAF(flags);
u8 flags = TDB_BTREE_PAGE_GET_FLAGS(pPage);
u8 leaf = TDB_BTREE_PAGE_IS_LEAF(flags);
if (leaf) {
pCur->idx = midx;
*pCRst = c;
......@@ -274,18 +303,16 @@ static int tdbBtCursorMoveTo(SBtCursor *pCur, const void *pKey, int kLen, int *p
} else {
if (c <= 0) {
pCur->idx = midx;
tdbBtCursorMoveToChild(pCur, cd.pgno);
tdbBtcMoveDownward(pCur, cd.pgno);
} else {
pCur->idx = midx + 1;
if (midx == nCells - 1) {
/* Move to right-most child */
pCur->idx = midx + 1;
tdbBtCursorMoveToChild(pCur, ((SBtPageHdr *)(pPage->pAmHdr))->rChild);
tdbBtcMoveDownward(pCur, ((SIntHdr *)pCur->pPage->pData)->pgno);
} else {
// TODO: reset cd as uninitialized
pCur->idx = midx + 1;
pCell = TDB_PAGE_CELL_AT(pPage, midx + 1);
pCell = tdbPageGetCell(pPage, pCur->idx);
tdbBtreeDecodeCell(pPage, pCell, &cd);
tdbBtCursorMoveToChild(pCur, cd.pgno);
tdbBtcMoveDownward(pCur, cd.pgno);
}
}
}
......@@ -299,32 +326,6 @@ static int tdbBtCursorMoveTo(SBtCursor *pCur, const void *pKey, int kLen, int *p
return 0;
}
static int tdbBtCursorMoveToRoot(SBtCursor *pCur) {
SBTree *pBt;
SPager *pPager;
SPage *pPage;
int ret;
pBt = pCur->pBt;
pPager = pBt->pPager;
// pPage = tdbPagerGet(pPager, pBt->root, true);
// if (pPage == NULL) {
// // TODO: handle error
// }
// ret = tdbInitBtPage(pPage, &pBtPage);
// if (ret < 0) {
// // TODO
// return 0;
// }
// pCur->pPage = pBtPage;
// pCur->iPage = 0;
return 0;
}
static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2) {
int mlen;
int cret;
......@@ -363,7 +364,7 @@ static int tdbBtreeOpenImpl(SBTree *pBt) {
}
// Try to create a new database
SBtreeZeroPageArg zArg = {.flags = TDB_BTREE_ROOT | TDB_BTREE_LEAF, .pBt = pBt};
SBtreeInitPageArg zArg = {.flags = TDB_BTREE_ROOT | TDB_BTREE_LEAF, .pBt = pBt};
ret = tdbPagerNewPage(pBt->pPager, &pgno, &pPage, tdbBtreeZeroPage, &zArg);
if (ret < 0) {
return -1;
......@@ -379,28 +380,19 @@ static int tdbBtreeOpenImpl(SBTree *pBt) {
static int tdbBtreeInitPage(SPage *pPage, void *arg) {
SBTree *pBt;
u16 flags;
u8 flags;
u8 isLeaf;
pBt = (SBTree *)arg;
flags = TDB_PAGE_FLAGS(pPage);
flags = TDB_BTREE_PAGE_GET_FLAGS(pPage);
isLeaf = TDB_BTREE_PAGE_IS_LEAF(flags);
if (isLeaf) {
pPage->szAmHdr = 0;
} else {
pPage->szAmHdr = sizeof(SBtPageHdr);
}
pPage->pPageHdr = pPage->pData;
pPage->pAmHdr = pPage->pPageHdr + pPage->pPageMethods->szPageHdr;
pPage->pCellIdx = pPage->pAmHdr + pPage->szAmHdr;
pPage->pFreeStart = pPage->pCellIdx + pPage->pPageMethods->szOffset * TDB_PAGE_NCELLS(pPage);
pPage->pFreeEnd = pPage->pData + TDB_PAGE_CCELLS(pPage);
pPage->pPageFtr = (SPageFtr *)(pPage->pData + pPage->pageSize - sizeof(SPageFtr));
ASSERT(flags == TDB_BTREE_PAGE_GET_FLAGS(pPage));
tdbPageInit(pPage, isLeaf ? sizeof(SLeafHdr) : sizeof(SIntHdr), tdbBtreeCellSize);
TDB_BTREE_ASSERT_FLAG(flags);
// Init other fields
if (isLeaf) {
pPage->kLen = pBt->keyLen;
pPage->vLen = pBt->valLen;
......@@ -413,30 +405,38 @@ static int tdbBtreeInitPage(SPage *pPage, void *arg) {
pPage->minLocal = pBt->minLocal;
}
// TODO: need to update the SPage.nFree
pPage->nFree = pPage->pFreeEnd - pPage->pFreeStart;
pPage->nOverflow = 0;
return 0;
}
static int tdbBtreeZeroPage(SPage *pPage, void *arg) {
u16 flags;
u8 flags;
SBTree *pBt;
u8 isLeaf;
flags = ((SBtreeZeroPageArg *)arg)->flags;
pBt = ((SBtreeZeroPageArg *)arg)->pBt;
flags = ((SBtreeInitPageArg *)arg)->flags;
pBt = ((SBtreeInitPageArg *)arg)->pBt;
isLeaf = TDB_BTREE_PAGE_IS_LEAF(flags);
pPage->pPageHdr = pPage->pData;
tdbPageZero(pPage, isLeaf ? sizeof(SLeafHdr) : sizeof(SIntHdr), tdbBtreeCellSize);
// Init the page header
TDB_PAGE_FLAGS_SET(pPage, flags);
TDB_PAGE_NCELLS_SET(pPage, 0);
TDB_PAGE_CCELLS_SET(pPage, pBt->pageSize - sizeof(SPageFtr));
TDB_PAGE_FCELL_SET(pPage, 0);
TDB_PAGE_NFREE_SET(pPage, 0);
if (isLeaf) {
SLeafHdr *pLeafHdr = (SLeafHdr *)(pPage->pData);
pLeafHdr->flags = flags;
tdbBtreeInitPage(pPage, (void *)pBt);
pPage->kLen = pBt->keyLen;
pPage->vLen = pBt->valLen;
pPage->maxLocal = pBt->maxLeaf;
pPage->minLocal = pBt->minLeaf;
} else {
SIntHdr *pIntHdr = (SIntHdr *)(pPage->pData);
pIntHdr->flags = flags;
pIntHdr->pgno = 0;
pPage->kLen = pBt->keyLen;
pPage->vLen = sizeof(SPgno);
pPage->maxLocal = pBt->maxLocal;
pPage->minLocal = pBt->minLocal;
}
return 0;
}
......@@ -452,55 +452,34 @@ typedef struct {
SPage *pNewPages[5];
} SBtreeBalanceHelper;
static int tdbBtreeCopyPageContent(SPage *pFrom, SPage *pTo) {
int nCells = TDB_PAGE_NCELLS(pFrom);
int cCells = TDB_PAGE_CCELLS(pFrom);
int fCell = TDB_PAGE_FCELL(pFrom);
int nFree = TDB_PAGE_NFREE(pFrom);
pTo->pFreeStart = pTo->pCellIdx + nCells * pFrom->pPageMethods->szOffset;
memcpy(pTo->pCellIdx, pFrom->pCellIdx, nCells * pFrom->pPageMethods->szOffset);
pTo->pFreeEnd = (u8 *)pTo->pPageFtr - (u8 *)(pFrom->pPageFtr) + pFrom->pFreeEnd;
memcpy(pTo->pFreeEnd, pFrom->pFreeEnd, (u8 *)pFrom->pPageFtr - pFrom->pFreeEnd);
TDB_PAGE_NCELLS_SET(pTo, nCells);
TDB_PAGE_CCELLS_SET(pTo, cCells);
TDB_PAGE_FCELL_SET(pTo, fCell);
TDB_PAGE_NFREE_SET(pTo, nFree);
// TODO: update other fields
return 0;
}
static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) {
SPager *pPager;
SPage *pChild;
SPgno pgnoChild;
int ret;
SBtreeZeroPageArg zArg;
u8 flags;
SIntHdr *pIntHdr;
SBtreeInitPageArg zArg;
u8 leaf;
pPager = pRoot->pPager;
flags = TDB_BTREE_PAGE_GET_FLAGS(pRoot);
leaf = TDB_BTREE_PAGE_IS_LEAF(flags);
// Allocate a new child page
zArg.flags = TDB_BTREE_LEAF;
zArg.flags = TDB_FLAG_REMOVE(flags, TDB_BTREE_ROOT);
zArg.pBt = pBt;
ret = tdbPagerNewPage(pPager, &pgnoChild, &pChild, tdbBtreeZeroPage, &zArg);
if (ret < 0) {
return -1;
}
// Copy the root page content to the child page
ret = tdbBtreeCopyPageContent(pRoot, pChild);
if (ret < 0) {
return -1;
if (!leaf) {
((SIntHdr *)pChild->pData)->pgno = ((SIntHdr *)(pRoot->pData))->pgno;
}
pChild->nOverflow = pRoot->nOverflow;
for (int i = 0; i < pChild->nOverflow; i++) {
pChild->apOvfl[i] = pRoot->apOvfl[i];
pChild->aiOvfl[i] = pRoot->aiOvfl[i];
}
// Copy the root page content to the child page
tdbPageCopy(pRoot, pChild);
// Reinitialize the root page
zArg.flags = TDB_BTREE_ROOT;
......@@ -510,225 +489,328 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) {
return -1;
}
((SBtPageHdr *)pRoot->pAmHdr)[0].rChild = pgnoChild;
pIntHdr = (SIntHdr *)(pRoot->pData);
pIntHdr->pgno = pgnoChild;
*ppChild = pChild;
return 0;
}
static int tdbBtreeBalanceStep1(SBtreeBalanceHelper *pBlh) {
int nCells;
int i;
int idxStart;
int nChild;
int ret;
SPage *pParent;
SPgno pgno;
SCell *pCell;
SCellDecoder cd;
SBTree *pBt;
pParent = pBlh->pParent;
nCells = TDB_PAGE_NCELLS(pParent);
nChild = nCells + 1;
pBt = pBlh->pBt;
// TODO: ASSERT below needs to be removed
ASSERT(pParent->nOverflow == 0);
ASSERT(pBlh->idx <= nCells);
static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
int ret;
if (nChild < 3) {
idxStart = 0;
pBlh->nOld = nChild;
} else {
if (pBlh->idx == 0) {
idxStart = 0;
} else if (pBlh->idx == nCells) {
idxStart = pBlh->idx - 2;
int nOlds;
SPage *pOlds[3] = {0};
SCell *pDivCell[3] = {0};
int szDivCell[3];
int sIdx;
u8 childNotLeaf;
SPgno rPgno;
{ // Find 3 child pages at most to do balance
int nCells = TDB_PAGE_TOTAL_CELLS(pParent);
SCell *pCell;
if (nCells <= 2) {
sIdx = 0;
nOlds = nCells + 1;
} else {
idxStart = pBlh->idx - 1;
// has more than three child pages
if (idx == 0) {
sIdx = 0;
} else if (idx == nCells) {
sIdx = idx - 2;
} else {
sIdx = idx - 1;
}
nOlds = 3;
}
pBlh->nOld = 3;
}
for (int i = 0; i < nOlds; i++) {
ASSERT(sIdx + i <= nCells);
i = pBlh->nOld - 1;
SPgno pgno;
if (sIdx + i == nCells) {
ASSERT(!TDB_BTREE_PAGE_IS_LEAF(TDB_BTREE_PAGE_GET_FLAGS(pParent)));
pgno = ((SIntHdr *)(pParent->pData))->pgno;
} else {
pCell = tdbPageGetCell(pParent, sIdx + i);
pgno = *(SPgno *)pCell;
}
if (idxStart + i == nCells) {
pgno = ((SBtPageHdr *)(pParent->pAmHdr))[0].rChild;
} else {
pCell = TDB_PAGE_CELL_AT(pParent, idxStart + i);
// TODO: no need to decode the payload part, and even the kLen, vLen part
// we only need the pgno part
ret = tdbBtreeDecodeCell(pParent, pCell, &cd);
if (ret < 0) {
ASSERT(0);
return -1;
}
pgno = cd.pgno;
}
for (;;) {
ret = tdbPagerFetchPage(pBt->pPager, pgno, &(pBlh->pOldPages[i]), tdbBtreeInitPage, pBt);
if (ret < 0) {
ASSERT(0);
return -1;
ret = tdbPagerFetchPage(pBt->pPager, pgno, pOlds + i, tdbBtreeInitPage, pBt);
if (ret < 0) {
ASSERT(0);
return -1;
}
}
// copy the parent key out if child pages are not leaf page
childNotLeaf = !TDB_BTREE_PAGE_IS_LEAF(TDB_BTREE_PAGE_GET_FLAGS(pOlds[0]));
if (childNotLeaf) {
for (int i = 0; i < nOlds; i++) {
if (sIdx + i < TDB_PAGE_TOTAL_CELLS(pParent)) {
pCell = tdbPageGetCell(pParent, sIdx + i);
szDivCell[i] = tdbBtreeCellSize(pParent, pCell);
pDivCell[i] = malloc(szDivCell[i]);
memcpy(pDivCell[i], pCell, szDivCell[i]);
}
// Loop over
if ((i--) == 0) break;
{
// TODO
// ASSERT(0);
if (i < nOlds - 1) {
((SPgno *)pDivCell[i])[0] = ((SIntHdr *)pOlds[i]->pData)->pgno;
((SIntHdr *)pOlds[i]->pData)->pgno = 0;
tdbPageInsertCell(pOlds[i], TDB_PAGE_TOTAL_CELLS(pOlds[i]), pDivCell[i], szDivCell[i], 1);
}
}
rPgno = ((SIntHdr *)pOlds[nOlds - 1]->pData)->pgno;
}
}
return 0;
}
static int tdbBtreeBalanceStep2(SBtreeBalanceHelper *pBlh) {
#if 0
SPage *pPage;
int oidx;
int cidx;
int limit;
SCell *pCell;
for (int i = 0; i < pBlh->nOld; i++) {
pPage = pBlh->pOldPages[i];
oidx = 0;
cidx = 0;
if (oidx < pPage->nOverflow) {
limit = pPage->aiOvfl[oidx];
} else {
limit = pPage->pPageHdr->nCells;
// drop the cells on parent page
for (int i = 0; i < nOlds; i++) {
nCells = TDB_PAGE_TOTAL_CELLS(pParent);
if (sIdx < nCells) {
tdbPageDropCell(pParent, sIdx);
} else {
((SIntHdr *)pParent->pData)->pgno = 0;
}
}
}
// Loop to copy each cell pointer out
for (;;) {
if (oidx >= pPage->nOverflow && cidx >= pPage->pPageHdr->nCells) break;
if (cidx < limit) {
// Get local cells
pCell = TDB_PAGE_CELL_AT(pPage, cidx);
} else if (cidx == limit) {
// Get overflow cells
pCell = pPage->apOvfl[oidx++];
if (oidx < pPage->nOverflow) {
limit = pPage->aiOvfl[oidx];
} else {
limit = pPage->pPageHdr->nCells;
int nNews = 0;
struct {
int cnt;
int size;
int iPage;
int oIdx;
} infoNews[5] = {0};
{ // Get how many new pages are needed and the new distribution
// first loop to find minimum number of pages needed
for (int oPage = 0; oPage < nOlds; oPage++) {
SPage *pPage = pOlds[oPage];
SCell *pCell;
int cellBytes;
int oIdx;
for (oIdx = 0; oIdx < TDB_PAGE_TOTAL_CELLS(pPage); oIdx++) {
pCell = tdbPageGetCell(pPage, oIdx);
cellBytes = TDB_BYTES_CELL_TAKEN(pPage, pCell);
if (infoNews[nNews].size + cellBytes > TDB_PAGE_USABLE_SIZE(pPage)) {
// page is full, use a new page
nNews++;
ASSERT(infoNews[nNews].size + cellBytes <= TDB_PAGE_USABLE_SIZE(pPage));
if (childNotLeaf) {
// for non-child page, this cell is used as the right-most child,
// the divider cell to parent as well
continue;
}
}
} else {
ASSERT(0);
infoNews[nNews].cnt++;
infoNews[nNews].size += cellBytes;
infoNews[nNews].iPage = oPage;
infoNews[nNews].oIdx = oIdx;
}
}
{
// TODO: Copy divider cells here
}
}
nNews++;
/* TODO */
// back loop to make the distribution even
for (int iNew = nNews - 1; iNew > 0; iNew--) {
SCell *pCell;
int szLCell, szRCell;
#endif
return 0;
}
for (;;) {
pCell = tdbPageGetCell(pOlds[infoNews[iNew - 1].iPage], infoNews[iNew - 1].oIdx);
static int tdbBtreeBalanceStep3(SBtreeBalanceHelper *pBlh) {
// Figure out number of pages needed after balance
for (int i = 0; i < pBlh->nOld; i++) {
/* TODO */
}
if (childNotLeaf) {
szLCell = szRCell = tdbBtreeCellSize(pOlds[infoNews[iNew - 1].iPage], pCell);
} else {
szLCell = tdbBtreeCellSize(pOlds[infoNews[iNew - 1].iPage], pCell);
int iPage = infoNews[iNew - 1].iPage;
int oIdx = infoNews[iNew - 1].oIdx + 1;
SPage *pPage;
for (;;) {
pPage = pOlds[iPage];
if (oIdx < TDB_PAGE_TOTAL_CELLS(pPage)) {
break;
}
iPage++;
oIdx = 0;
}
return 0;
}
pCell = tdbPageGetCell(pPage, oIdx);
szRCell = tdbBtreeCellSize(pPage, pCell);
}
static int tdbBtreeBalanceStep4(SBtreeBalanceHelper *pBlh) {
// TODO
return 0;
}
ASSERT(infoNews[iNew - 1].cnt > 0);
static int tdbBtreeBalanceStep5(SBtreeBalanceHelper *pBlh) {
// TODO
return 0;
}
if (infoNews[iNew].size + szRCell >= infoNews[iNew - 1].size - szRCell) {
break;
}
static int tdbBtreeBalanceStep6(SBtreeBalanceHelper *pBlh) {
// TODO
return 0;
}
// Move a cell right forward
infoNews[iNew - 1].cnt--;
infoNews[iNew - 1].size -= szLCell;
infoNews[iNew - 1].oIdx--;
for (;;) {
if (infoNews[iNew - 1].oIdx >= 0) {
break;
}
static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
int ret;
SBtreeBalanceHelper blh;
infoNews[iNew - 1].iPage--;
infoNews[iNew - 1].oIdx = TDB_PAGE_TOTAL_CELLS(pOlds[infoNews[iNew - 1].iPage]) - 1;
}
ASSERT(!TDB_BTREE_PAGE_IS_LEAF(TDB_PAGE_FLAGS(pParent)));
infoNews[iNew].cnt++;
infoNews[iNew].size += szRCell;
}
}
}
blh.pBt = pBt;
blh.pParent = pParent;
blh.idx = idx;
SPage *pNews[5] = {0};
{ // Allocate new pages, reuse the old page when possible
// Step 1: find two sibling pages and get engough info about the old pages
ret = tdbBtreeBalanceStep1(&blh);
if (ret < 0) {
ASSERT(0);
return -1;
}
SPgno pgno;
SBtreeInitPageArg iarg;
u8 flags;
// Step 2: Load all cells on the old page and the divider cells
ret = tdbBtreeBalanceStep2(&blh);
if (ret < 0) {
ASSERT(0);
return -1;
}
flags = TDB_BTREE_PAGE_GET_FLAGS(pOlds[0]);
// Step 3: Get the number of pages needed to hold all cells
ret = tdbBtreeBalanceStep3(&blh);
if (ret < 0) {
ASSERT(0);
return -1;
}
for (int iNew = 0; iNew < nNews; iNew++) {
if (iNew < nOlds) {
pNews[iNew] = pOlds[iNew];
} else {
iarg.pBt = pBt;
iarg.flags = flags;
ret = tdbPagerNewPage(pBt->pPager, &pgno, pNews + iNew, tdbBtreeZeroPage, &iarg);
if (ret < 0) {
ASSERT(0);
}
}
}
// Step 4: Allocate enough new pages. Reuse old pages as much as possible
ret = tdbBtreeBalanceStep4(&blh);
if (ret < 0) {
ASSERT(0);
return -1;
// TODO: sort the page according to the page number
}
// Step 5: Insert new divider cells into pParent
ret = tdbBtreeBalanceStep5(&blh);
if (ret < 0) {
ASSERT(0);
return -1;
}
{ // Do the real cell distribution
SPage *pOldsCopy[3] = {0};
SCell *pCell;
int szCell;
SBtreeInitPageArg iarg;
int iNew, nNewCells;
SCellDecoder cd;
iarg.pBt = pBt;
iarg.flags = TDB_BTREE_PAGE_GET_FLAGS(pOlds[0]);
for (int i = 0; i < nOlds; i++) {
tdbPageCreate(pOlds[0]->pageSize, &pOldsCopy[i], NULL, NULL);
tdbBtreeZeroPage(pOldsCopy[i], &iarg);
tdbPageCopy(pOlds[i], pOldsCopy[i]);
}
iNew = 0;
nNewCells = 0;
tdbBtreeZeroPage(pNews[iNew], &iarg);
for (int iOld = 0; iOld < nOlds; iOld++) {
SPage *pPage;
pPage = pOldsCopy[iOld];
for (int oIdx = 0; oIdx < TDB_PAGE_TOTAL_CELLS(pPage); oIdx++) {
pCell = tdbPageGetCell(pPage, oIdx);
szCell = tdbBtreeCellSize(pPage, pCell);
ASSERT(nNewCells <= infoNews[iNew].cnt);
ASSERT(iNew < nNews);
if (nNewCells < infoNews[iNew].cnt) {
tdbPageInsertCell(pNews[iNew], nNewCells, pCell, szCell, 0);
nNewCells++;
// insert parent page
if (!childNotLeaf && nNewCells == infoNews[iNew].cnt) {
SIntHdr *pIntHdr = (SIntHdr *)pParent->pData;
if (iNew == nNews - 1 && pIntHdr->pgno == 0) {
pIntHdr->pgno = TDB_PAGE_PGNO(pNews[iNew]);
} else {
tdbBtreeDecodeCell(pPage, pCell, &cd);
// TODO: pCell here may be inserted as an overflow cell, handle it
SCell *pNewCell = malloc(cd.kLen + 9);
int szNewCell;
SPgno pgno;
pgno = TDB_PAGE_PGNO(pNews[iNew]);
tdbBtreeEncodeCell(pParent, cd.pKey, cd.kLen, (void *)&pgno, sizeof(SPgno), pNewCell, &szNewCell);
tdbPageInsertCell(pParent, sIdx++, pNewCell, szNewCell, 0);
free(pNewCell);
}
// move to next new page
iNew++;
nNewCells = 0;
if (iNew < nNews) {
tdbBtreeZeroPage(pNews[iNew], &iarg);
}
}
} else {
ASSERT(childNotLeaf);
ASSERT(iNew < nNews - 1);
// set current new page right-most child
((SIntHdr *)pNews[iNew]->pData)->pgno = ((SPgno *)pCell)[0];
// insert to parent as divider cell
ASSERT(iNew < nNews - 1);
((SPgno *)pCell)[0] = TDB_PAGE_PGNO(pNews[iNew]);
tdbPageInsertCell(pParent, sIdx++, pCell, szCell, 0);
// move to next new page
iNew++;
nNewCells = 0;
if (iNew < nNews) {
tdbBtreeZeroPage(pNews[iNew], &iarg);
}
}
}
}
// Step 6: Update the sibling pages
ret = tdbBtreeBalanceStep6(&blh);
if (ret < 0) {
ASSERT(0);
return -1;
}
if (childNotLeaf) {
ASSERT(TDB_PAGE_TOTAL_CELLS(pNews[nNews - 1]) == infoNews[nNews - 1].cnt);
((SIntHdr *)(pNews[nNews - 1]->pData))->pgno = rPgno;
{
// TODO: Reset states
SIntHdr *pIntHdr = (SIntHdr *)pParent->pData;
if (pIntHdr->pgno == 0) {
pIntHdr->pgno = TDB_PAGE_PGNO(pNews[nNews - 1]);
} else {
((SPgno *)pDivCell[nOlds - 1])[0] = TDB_PAGE_PGNO(pNews[nNews - 1]);
tdbPageInsertCell(pParent, sIdx, pDivCell[nOlds - 1], szDivCell[nOlds - 1], 0);
}
}
for (int i = 0; i < nOlds; i++) {
tdbPageDestroy(pOldsCopy[i], NULL, NULL);
}
}
{
// TODO: Clear resources
for (int i = 0; i < 3; i++) {
if (pDivCell[i]) {
free(pDivCell[i]);
}
}
return 0;
}
static int tdbBtreeBalance(SBtCursor *pCur) {
static int tdbBtreeBalance(SBTC *pCur) {
int iPage;
SPage *pParent;
SPage *pPage;
int ret;
u16 flags;
u8 flags;
u8 leaf;
u8 root;
......@@ -736,17 +818,10 @@ static int tdbBtreeBalance(SBtCursor *pCur) {
for (;;) {
iPage = pCur->iPage;
pPage = pCur->pPage;
flags = TDB_PAGE_FLAGS(pPage);
flags = TDB_BTREE_PAGE_GET_FLAGS(pPage);
leaf = TDB_BTREE_PAGE_IS_LEAF(flags);
root = TDB_BTREE_PAGE_IS_ROOT(flags);
// TODO: Get the page free space if not get yet
// if (pPage->nFree < 0) {
// if (tdbBtreeComputeFreeSpace(pPage) < 0) {
// return -1;
// }
// }
// when the page is not overflow and not too empty, the balance work
// is finished. Just break out the balance loop.
if (pPage->nOverflow == 0 /* TODO: && pPage->nFree <= */) {
......@@ -758,7 +833,7 @@ static int tdbBtreeBalance(SBtCursor *pCur) {
// ignore the case of empty
if (pPage->nOverflow == 0) break;
ret = tdbBtreeBalanceDeeper(pCur->pBt, pCur->pPage, &(pCur->pgStack[1]));
ret = tdbBtreeBalanceDeeper(pCur->pBt, pPage, &(pCur->pgStack[1]));
if (ret < 0) {
return -1;
}
......@@ -817,9 +892,10 @@ static int tdbBtreeEncodePayload(SPage *pPage, u8 *pPayload, const void *pKey, i
return 0;
}
// TODO: allow vLen = 0
static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const void *pVal, int vLen, SCell *pCell,
int *szCell) {
u16 flags;
u8 flags;
u8 leaf;
int nHeader;
int nPayload;
......@@ -830,10 +906,18 @@ static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const vo
nPayload = 0;
nHeader = 0;
flags = TDB_PAGE_FLAGS(pPage);
flags = TDB_BTREE_PAGE_GET_FLAGS(pPage);
leaf = TDB_BTREE_PAGE_IS_LEAF(flags);
// 1. Encode Header part
/* Encode SPgno if interior page */
if (!leaf) {
ASSERT(pPage->vLen == sizeof(SPgno));
((SPgno *)(pCell + nHeader))[0] = ((SPgno *)pVal)[0];
nHeader = nHeader + sizeof(SPgno);
}
/* Encode kLen if need */
if (pPage->kLen == TDB_VARIANT_LEN) {
nHeader += tdbPutVarInt(pCell + nHeader, kLen);
......@@ -844,14 +928,6 @@ static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const vo
nHeader += tdbPutVarInt(pCell + nHeader, vLen);
}
/* Encode SPgno if interior page */
if (!leaf) {
ASSERT(pPage->vLen == sizeof(SPgno));
((SPgno *)(pCell + nHeader))[0] = ((SPgno *)pVal)[0];
nHeader = nHeader + sizeof(SPgno);
}
// 2. Encode payload part
if (leaf) {
ret = tdbBtreeEncodePayload(pPage, pCell + nHeader, pKey, kLen, pVal, vLen, &nPayload);
......@@ -893,13 +969,13 @@ static int tdbBtreeDecodePayload(SPage *pPage, const u8 *pPayload, SCellDecoder
}
static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder) {
u16 flags;
u8 flags;
u8 leaf;
int nHeader;
int ret;
nHeader = 0;
flags = TDB_PAGE_FLAGS(pPage);
flags = TDB_BTREE_PAGE_GET_FLAGS(pPage);
leaf = TDB_BTREE_PAGE_IS_LEAF(flags);
// Clear the state of decoder
......@@ -910,6 +986,14 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD
pDecoder->pgno = 0;
// 1. Decode header part
if (!leaf) {
ASSERT(pPage->vLen == sizeof(SPgno));
pDecoder->pgno = ((SPgno *)(pCell + nHeader))[0];
pDecoder->pVal = (u8 *)(&(pDecoder->pgno));
nHeader = nHeader + sizeof(SPgno);
}
if (pPage->kLen == TDB_VARIANT_LEN) {
nHeader += tdbGetVarInt(pCell + nHeader, &(pDecoder->kLen));
} else {
......@@ -922,14 +1006,6 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD
pDecoder->vLen = pPage->vLen;
}
if (!leaf) {
ASSERT(pPage->vLen == sizeof(SPgno));
pDecoder->pgno = ((SPgno *)(pCell + nHeader))[0];
pDecoder->pVal = (u8 *)(&(pDecoder->pgno));
nHeader = nHeader + sizeof(SPgno);
}
// 2. Decode payload part
ret = tdbBtreeDecodePayload(pPage, pCell + nHeader, pDecoder);
if (ret < 0) {
......@@ -939,4 +1015,313 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD
return 0;
}
static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell) {
u8 flags;
u8 isLeaf;
int szCell;
int kLen = 0, vLen = 0;
flags = TDB_BTREE_PAGE_GET_FLAGS(pPage);
isLeaf = TDB_BTREE_PAGE_IS_LEAF(flags);
szCell = 0;
if (!isLeaf) {
szCell += sizeof(SPgno);
}
if (pPage->kLen == TDB_VARIANT_LEN) {
szCell += tdbGetVarInt(pCell + szCell, &kLen);
} else {
kLen = pPage->kLen;
}
if (isLeaf) {
if (pPage->vLen == TDB_VARIANT_LEN) {
szCell += tdbGetVarInt(pCell + szCell, &vLen);
} else {
vLen = pPage->vLen;
}
}
szCell = szCell + kLen + vLen;
return szCell;
}
#endif
int tdbBtcOpen(SBTC *pCur, SBTree *pBt) {
pCur->pBt = pBt;
pCur->iPage = -1;
pCur->pPage = NULL;
pCur->idx = -1;
return 0;
}
int tdbBtcMoveToFirst(SBTC *pBtc) {
int ret;
SBTree *pBt;
SPager *pPager;
u8 flags;
SCell *pCell;
SPgno pgno;
pBt = pBtc->pBt;
pPager = pBt->pPager;
if (pBtc->iPage < 0) {
// move a clean cursor
ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt);
if (ret < 0) {
ASSERT(0);
return -1;
}
pBtc->iPage = 0;
pBtc->idx = 0;
} else {
// move from a position
ASSERT(0);
}
// move downward
for (;;) {
flags = TDB_BTREE_PAGE_GET_FLAGS(pBtc->pPage);
if (TDB_BTREE_PAGE_IS_LEAF(flags)) break;
pCell = tdbPageGetCell(pBtc->pPage, 0);
pgno = *(SPgno *)pCell;
ret = tdbBtcMoveDownward(pBtc, pgno);
if (ret < 0) {
ASSERT(0);
return -1;
}
pBtc->idx = 0;
}
return 0;
}
int tdbBtcMoveToLast(SBTC *pBtc) {
int ret;
SBTree *pBt;
SPager *pPager;
u8 flags;
SPgno pgno;
pBt = pBtc->pBt;
pPager = pBt->pPager;
if (pBtc->iPage < 0) {
// move a clean cursor
ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt);
if (ret < 0) {
ASSERT(0);
return -1;
}
pBtc->iPage = 0;
} else {
// move from a position
ASSERT(0);
}
// move downward
for (;;) {
flags = TDB_BTREE_PAGE_GET_FLAGS(pBtc->pPage);
if (TDB_BTREE_PAGE_IS_LEAF(flags)) {
// TODO: handle empty case
ASSERT(TDB_PAGE_TOTAL_CELLS(pBtc->pPage) > 0);
pBtc->idx = TDB_PAGE_TOTAL_CELLS(pBtc->pPage) - 1;
break;
} else {
pBtc->idx = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
pgno = ((SIntHdr *)pBtc->pPage->pData)->pgno;
ret = tdbBtcMoveDownward(pBtc, pgno);
if (ret < 0) {
ASSERT(0);
return -1;
}
}
}
return 0;
}
int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen) {
// TODO
return 0;
}
int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
SCell *pCell;
SCellDecoder cd;
void *pKey, *pVal;
int ret;
if (pBtc->idx < 0) {
return -1;
}
pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);
tdbBtreeDecodeCell(pBtc->pPage, pCell, &cd);
pKey = TDB_REALLOC(*ppKey, cd.kLen);
if (pKey == NULL) {
return -1;
}
// TODO: vLen may be zero
pVal = TDB_REALLOC(*ppVal, cd.vLen);
if (pVal == NULL) {
TDB_FREE(pKey);
return -1;
}
*ppKey = pKey;
*ppVal = pVal;
*kLen = cd.kLen;
*vLen = cd.vLen;
memcpy(pKey, cd.pKey, cd.kLen);
memcpy(pVal, cd.pVal, cd.vLen);
ret = tdbBtcMoveToNext(pBtc);
return 0;
}
static int tdbBtcMoveToNext(SBTC *pBtc) {
int nCells;
SPgno pgno;
SCell *pCell;
u8 flags;
ASSERT(TDB_BTREE_PAGE_IS_LEAF(TDB_BTREE_PAGE_GET_FLAGS(pBtc->pPage)));
if (pBtc->idx < 0) return -1;
pBtc->idx++;
if (pBtc->idx < TDB_PAGE_TOTAL_CELLS(pBtc->pPage)) {
return 0;
}
if (pBtc->iPage == 0) {
pBtc->idx = -1;
return 0;
}
// Move upward
for (;;) {
tdbBtcMoveUpward(pBtc);
pBtc->idx++;
nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
if (pBtc->idx <= nCells) {
break;
}
if (pBtc->iPage == 0) {
pBtc->idx = -1;
return 0;
}
}
// Move downward
for (;;) {
nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
if (pBtc->idx < nCells) {
pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);
pgno = *(SPgno *)pCell;
} else {
pgno = ((SIntHdr *)pBtc->pPage->pData)->pgno;
}
tdbBtcMoveDownward(pBtc, pgno);
pBtc->idx = 0;
flags = TDB_BTREE_PAGE_GET_FLAGS(pBtc->pPage);
if (TDB_BTREE_PAGE_IS_LEAF(flags)) {
break;
}
}
return 0;
}
int tdbBtcClose(SBTC *pBtc) {
// TODO
return 0;
}
static int tdbBtcMoveDownward(SBTC *pCur, SPgno pgno) {
int ret;
pCur->pgStack[pCur->iPage] = pCur->pPage;
pCur->idxStack[pCur->iPage] = pCur->idx;
pCur->iPage++;
pCur->pPage = NULL;
pCur->idx = -1;
ret = tdbPagerFetchPage(pCur->pBt->pPager, pgno, &pCur->pPage, tdbBtreeInitPage, pCur->pBt);
if (ret < 0) {
ASSERT(0);
}
return 0;
}
static int tdbBtcMoveUpward(SBTC *pBtc) {
if (pBtc->iPage == 0) return -1;
// tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage);
pBtc->iPage--;
pBtc->pPage = pBtc->pgStack[pBtc->iPage];
pBtc->idx = pBtc->idxStack[pBtc->iPage];
return 0;
}
#ifndef NODEBUG
typedef struct {
SPgno pgno;
u8 root;
u8 leaf;
SPgno rChild;
int nCells;
int nOvfl;
} SBtPageInfo;
SBtPageInfo btPageInfos[20];
void tdbBtPageInfo(SPage *pPage, int idx) {
u8 flags;
SBtPageInfo *pBtPageInfo;
pBtPageInfo = btPageInfos + idx;
pBtPageInfo->pgno = TDB_PAGE_PGNO(pPage);
flags = TDB_BTREE_PAGE_GET_FLAGS(pPage);
pBtPageInfo->root = TDB_BTREE_PAGE_IS_ROOT(flags);
pBtPageInfo->leaf = TDB_BTREE_PAGE_IS_LEAF(flags);
pBtPageInfo->rChild = 0;
if (!pBtPageInfo->leaf) {
pBtPageInfo->rChild = *(SPgno *)(pPage->pData + 1);
}
pBtPageInfo->nCells = TDB_PAGE_TOTAL_CELLS(pPage) - pPage->nOverflow;
pBtPageInfo->nOvfl = pPage->nOverflow;
}
#endif
\ No newline at end of file
......@@ -15,13 +15,17 @@
#include "tdbInt.h"
struct STDb {
struct STDB {
STEnv *pEnv;
SBTree *pBt;
};
int tdbDbOpen(const char *fname, int keyLen, int valLen, FKeyComparator keyCmprFn, STEnv *pEnv, STDb **ppDb) {
STDb *pDb;
struct STDBC {
SBTC btc;
};
int tdbDbOpen(const char *fname, int keyLen, int valLen, FKeyComparator keyCmprFn, STEnv *pEnv, STDB **ppDb) {
STDB *pDb;
SPager *pPager;
int ret;
char fFullName[TDB_FILENAME_LEN];
......@@ -30,7 +34,7 @@ int tdbDbOpen(const char *fname, int keyLen, int valLen, FKeyComparator keyCmprF
*ppDb = NULL;
pDb = (STDb *)taosMemoryCalloc(1, sizeof(*pDb));
pDb = (STDB *)calloc(1, sizeof(*pDb));
if (pDb == NULL) {
return -1;
}
......@@ -59,23 +63,23 @@ int tdbDbOpen(const char *fname, int keyLen, int valLen, FKeyComparator keyCmprF
return 0;
}
int tdbDbClose(STDb *pDb) {
int tdbDbClose(STDB *pDb) {
// TODO
return 0;
}
int tdbDbDrop(STDb *pDb) {
int tdbDbDrop(STDB *pDb) {
// TODO
return 0;
}
int tdbDbInsert(STDb *pDb, const void *pKey, int keyLen, const void *pVal, int valLen) {
SBtCursor btc;
SBtCursor *pCur;
int ret;
int tdbDbInsert(STDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen) {
SBTC btc;
SBTC *pCur;
int ret;
pCur = &btc;
ret = tdbBtreeCursor(pCur, pDb->pBt);
ret = tdbBtcOpen(pCur, pDb->pBt);
if (ret < 0) {
return -1;
}
......@@ -85,5 +89,45 @@ int tdbDbInsert(STDb *pDb, const void *pKey, int keyLen, const void *pVal, int v
return -1;
}
return 0;
}
int tdbDbGet(STDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen) {
return tdbBtreeGet(pDb->pBt, pKey, kLen, ppVal, vLen);
}
int tdbDbcOpen(STDB *pDb, STDBC **ppDbc) {
int ret;
STDBC *pDbc = NULL;
*ppDbc = NULL;
pDbc = malloc(sizeof(*pDbc));
if (pDbc == NULL) {
return -1;
}
tdbBtcOpen(&pDbc->btc, pDb->pBt);
// TODO: move to first now, we can move to any key-value
// and in any direction, design new APIs.
ret = tdbBtcMoveToFirst(&pDbc->btc);
if (ret < 0) {
ASSERT(0);
return -1;
}
*ppDbc = pDbc;
return 0;
}
int tdbDbNext(STDBC *pDbc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
return tdbBtreeNext(&pDbc->btc, ppKey, kLen, ppVal, vLen);
}
int tdbDbcClose(STDBC *pDbc) {
if (pDbc) {
free(pDbc);
}
return 0;
}
\ No newline at end of file
......@@ -27,7 +27,7 @@ int tdbEnvOpen(const char *rootDir, int pageSize, int cacheSize, STEnv **ppEnv)
dsize = strlen(rootDir);
zsize = sizeof(*pEnv) + dsize * 2 + strlen(TDB_JOURNAL_NAME) + 3;
pPtr = (uint8_t *)taosMemoryCalloc(1, zsize);
pPtr = (uint8_t *)calloc(1, zsize);
if (pPtr == NULL) {
return -1;
}
......
......@@ -17,7 +17,7 @@
struct SPCache {
int pageSize;
int cacheSize;
TdThreadMutex mutex;
pthread_mutex_t mutex;
int nFree;
SPage *pFree;
int nPage;
......@@ -53,19 +53,17 @@ static void tdbPCacheLock(SPCache *pCache);
static void tdbPCacheUnlock(SPCache *pCache);
static bool tdbPCacheLocked(SPCache *pCache);
static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, bool alcNewPage);
static void tdbPCachePinPage(SPage *pPage);
static void tdbPCacheRemovePageFromHash(SPage *pPage);
static void tdbPCacheAddPageToHash(SPage *pPage);
static void tdbPCacheUnpinPage(SPage *pPage);
static void *tdbOsMalloc(void *arg, size_t size);
static void tdbOsFree(void *arg, void *ptr);
static void tdbPCachePinPage(SPCache *pCache, SPage *pPage);
static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage);
static void tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage);
static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage);
int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache) {
SPCache *pCache;
void *pPtr;
SPage *pPgHdr;
pCache = (SPCache *)taosMemoryCalloc(1, sizeof(*pCache));
pCache = (SPCache *)calloc(1, sizeof(*pCache));
if (pCache == NULL) {
return -1;
}
......@@ -74,7 +72,7 @@ int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache) {
pCache->cacheSize = cacheSize;
if (tdbPCacheOpenImpl(pCache) < 0) {
taosMemoryFree(pCache);
free(pCache);
return -1;
}
......@@ -102,7 +100,7 @@ SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, bool alcNewPage) {
return pPage;
}
void tdbPCacheRelease(SPage *pPage) {
void tdbPCacheRelease(SPCache *pCache, SPage *pPage) {
i32 nRef;
nRef = TDB_UNREF_PAGE(pPage);
......@@ -110,7 +108,7 @@ void tdbPCacheRelease(SPage *pPage) {
if (nRef == 0) {
if (1 /*TODO: page still clean*/) {
tdbPCacheUnpinPage(pPage);
tdbPCacheUnpinPage(pCache, pPage);
} else {
// TODO
ASSERT(0);
......@@ -118,13 +116,13 @@ void tdbPCacheRelease(SPage *pPage) {
}
}
static void tdbPCacheInitLock(SPCache *pCache) { taosThreadMutexInit(&(pCache->mutex), NULL); }
static void tdbPCacheInitLock(SPCache *pCache) { pthread_mutex_init(&(pCache->mutex), NULL); }
static void tdbPCacheClearLock(SPCache *pCache) { taosThreadMutexDestroy(&(pCache->mutex)); }
static void tdbPCacheClearLock(SPCache *pCache) { pthread_mutex_destroy(&(pCache->mutex)); }
static void tdbPCacheLock(SPCache *pCache) { taosThreadMutexLock(&(pCache->mutex)); }
static void tdbPCacheLock(SPCache *pCache) { pthread_mutex_lock(&(pCache->mutex)); }
static void tdbPCacheUnlock(SPCache *pCache) { taosThreadMutexUnlock(&(pCache->mutex)); }
static void tdbPCacheUnlock(SPCache *pCache) { pthread_mutex_unlock(&(pCache->mutex)); }
static bool tdbPCacheLocked(SPCache *pCache) {
assert(0);
......@@ -144,7 +142,7 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, bool alcNe
if (pPage || !alcNewPage) {
if (pPage) {
tdbPCachePinPage(pPage);
tdbPCachePinPage(pCache, pPage);
}
return pPage;
}
......@@ -160,8 +158,8 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, bool alcNe
// 3. Try to Recycle a page
if (!pPage && !pCache->lru.pLruPrev->isAnchor) {
pPage = pCache->lru.pLruPrev;
tdbPCacheRemovePageFromHash(pPage);
tdbPCachePinPage(pPage);
tdbPCacheRemovePageFromHash(pCache, pPage);
tdbPCachePinPage(pCache, pPage);
}
// 4. Try a stress allocation (TODO)
......@@ -173,16 +171,13 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, bool alcNe
memcpy(&(pPage->pgid), pPgid, sizeof(*pPgid));
pPage->pLruNext = NULL;
pPage->pPager = NULL;
tdbPCacheAddPageToHash(pPage);
tdbPCacheAddPageToHash(pCache, pPage);
}
return pPage;
}
static void tdbPCachePinPage(SPage *pPage) {
SPCache *pCache;
pCache = pPage->pCache;
static void tdbPCachePinPage(SPCache *pCache, SPage *pPage) {
if (!PAGE_IS_PINNED(pPage)) {
pPage->pLruPrev->pLruNext = pPage->pLruNext;
pPage->pLruNext->pLruPrev = pPage->pLruPrev;
......@@ -192,11 +187,8 @@ static void tdbPCachePinPage(SPage *pPage) {
}
}
static void tdbPCacheUnpinPage(SPage *pPage) {
SPCache *pCache;
i32 nRef;
pCache = pPage->pCache;
static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage) {
i32 nRef;
tdbPCacheLock(pCache);
......@@ -217,12 +209,10 @@ static void tdbPCacheUnpinPage(SPage *pPage) {
tdbPCacheUnlock(pCache);
}
static void tdbPCacheRemovePageFromHash(SPage *pPage) {
SPCache *pCache;
SPage **ppPage;
int h;
static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) {
SPage **ppPage;
int h;
pCache = pPage->pCache;
h = PCACHE_PAGE_HASH(&(pPage->pgid));
for (ppPage = &(pCache->pgHash[h % pCache->nHash]); *ppPage != pPage; ppPage = &((*ppPage)->pHashNext))
;
......@@ -232,11 +222,9 @@ static void tdbPCacheRemovePageFromHash(SPage *pPage) {
pCache->nPage--;
}
static void tdbPCacheAddPageToHash(SPage *pPage) {
SPCache *pCache;
int h;
static void tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage) {
int h;
pCache = pPage->pCache;
h = PCACHE_PAGE_HASH(&(pPage->pgid)) % pCache->nHash;
pPage->pHashNext = pCache->pgHash[h];
......@@ -257,7 +245,7 @@ static int tdbPCacheOpenImpl(SPCache *pCache) {
pCache->nFree = 0;
pCache->pFree = NULL;
for (int i = 0; i < pCache->cacheSize; i++) {
ret = tdbPageCreate(pCache->pageSize, &pPage, tdbOsMalloc, NULL);
ret = tdbPageCreate(pCache->pageSize, &pPage, NULL, NULL);
if (ret < 0) {
// TODO: handle error
return -1;
......@@ -266,7 +254,6 @@ static int tdbPCacheOpenImpl(SPCache *pCache) {
// pPage->pgid = 0;
pPage->isAnchor = 0;
pPage->isLocalPage = 1;
pPage->pCache = pCache;
TDB_INIT_PAGE_REF(pPage);
pPage->pHashNext = NULL;
pPage->pLruNext = NULL;
......@@ -281,7 +268,7 @@ static int tdbPCacheOpenImpl(SPCache *pCache) {
// Open the hash table
pCache->nPage = 0;
pCache->nHash = pCache->cacheSize;
pCache->pgHash = (SPage **)taosMemoryCalloc(pCache->nHash, sizeof(SPage *));
pCache->pgHash = (SPage **)calloc(pCache->nHash, sizeof(SPage *));
if (pCache->pgHash == NULL) {
// TODO
return -1;
......@@ -297,13 +284,3 @@ static int tdbPCacheOpenImpl(SPCache *pCache) {
}
int tdbPCacheGetPageSize(SPCache *pCache) { return pCache->pageSize; }
static void *tdbOsMalloc(void *arg, size_t size) {
void *ptr;
ptr = taosMemoryMalloc(size);
return ptr;
}
static void tdbOsFree(void *arg, void *ptr) { taosMemoryFree(ptr); }
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
typedef struct __attribute__((__packed__)) {
u8 szCell[2];
u8 nxOffset[2];
} SFreeCell;
typedef struct __attribute__((__packed__)) {
u8 szCell[3];
u8 nxOffset[3];
} SFreeCellL;
/* For small page */
#define TDB_SPAGE_FREE_CELL_SIZE_PTR(PCELL) (((SFreeCell *)(PCELL))->szCell)
#define TDB_SPAGE_FREE_CELL_NXOFFSET_PTR(PCELL) (((SFreeCell *)(PCELL))->nxOffset)
#define TDB_SPAGE_FREE_CELL_SIZE(PCELL) ((u16 *)TDB_SPAGE_FREE_CELL_SIZE_PTR(PCELL))[0]
#define TDB_SPAGE_FREE_CELL_NXOFFSET(PCELL) ((u16 *)TDB_SPAGE_FREE_CELL_NXOFFSET_PTR(PCELL))[0]
#define TDB_SPAGE_FREE_CELL_SIZE_SET(PCELL, SIZE) (TDB_SPAGE_FREE_CELL_SIZE(PCELL) = (SIZE))
#define TDB_SPAGE_FREE_CELL_NXOFFSET_SET(PCELL, OFFSET) (TDB_SPAGE_FREE_CELL_NXOFFSET(PCELL) = (OFFSET))
/* For large page */
#define TDB_LPAGE_FREE_CELL_SIZE_PTR(PCELL) (((SFreeCellL *)(PCELL))->szCell)
#define TDB_LPAGE_FREE_CELL_NXOFFSET_PTR(PCELL) (((SFreeCellL *)(PCELL))->nxOffset)
#define TDB_LPAGE_FREE_CELL_SIZE(PCELL) TDB_GET_U24(TDB_LPAGE_FREE_CELL_SIZE_PTR(PCELL))
#define TDB_LPAGE_FREE_CELL_NXOFFSET(PCELL) TDB_GET_U24(TDB_LPAGE_FREE_CELL_NXOFFSET_PTR(PCELL))
#define TDB_LPAGE_FREE_CELL_SIZE_SET(PCELL, SIZE) TDB_PUT_U24(TDB_LPAGE_FREE_CELL_SIZE_PTR(PCELL), SIZE)
#define TDB_LPAGE_FREE_CELL_NXOFFSET_SET(PCELL, OFFSET) TDB_PUT_U24(TDB_LPAGE_FREE_CELL_NXOFFSET_PTR(PCELL), OFFSET)
/* For page */
#define TDB_PAGE_FREE_CELL_SIZE_PTR(PPAGE, PCELL) \
(TDB_IS_LARGE_PAGE(pPage) ? TDB_LPAGE_FREE_CELL_SIZE_PTR(PCELL) : TDB_SPAGE_FREE_CELL_SIZE_PTR(PCELL))
#define TDB_PAGE_FREE_CELL_NXOFFSET_PTR(PPAGE, PCELL) \
(TDB_IS_LARGE_PAGE(pPage) ? TDB_LPAGE_FREE_CELL_NXOFFSET_PTR(PCELL) : TDB_SPAGE_FREE_CELL_NXOFFSET_PTR(PCELL))
#define TDB_PAGE_FREE_CELL_SIZE(PPAGE, PCELL) \
(TDB_IS_LARGE_PAGE(pPage) ? TDB_LPAGE_FREE_CELL_SIZE(PCELL) : TDB_SPAGE_FREE_CELL_SIZE(PCELL))
#define TDB_PAGE_FREE_CELL_NXOFFSET(PPAGE, PCELL) \
(TDB_IS_LARGE_PAGE(pPage) ? TDB_LPAGE_FREE_CELL_NXOFFSET(PCELL) : TDB_SPAGE_FREE_CELL_NXOFFSET(PCELL))
#define TDB_PAGE_FREE_CELL_SIZE_SET(PPAGE, PCELL, SIZE) \
do { \
if (TDB_IS_LARGE_PAGE(PPAGE)) { \
TDB_LPAGE_FREE_CELL_SIZE_SET(PCELL, SIZE); \
} else { \
TDB_SPAGE_FREE_CELL_SIZE_SET(PCELL, SIZE); \
} \
} while (0)
#define TDB_PAGE_FREE_CELL_NXOFFSET_SET(PPAGE, PCELL, OFFSET) \
do { \
if (TDB_IS_LARGE_PAGE(PPAGE)) { \
TDB_LPAGE_FREE_CELL_NXOFFSET_SET(PCELL, OFFSET); \
} else { \
TDB_SPAGE_FREE_CELL_NXOFFSET_SET(PCELL, OFFSET); \
} \
} while (0)
static int tdbPageAllocate(SPage *pPage, int size, SCell **ppCell);
static int tdbPageDefragment(SPage *pPage);
int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t), void *arg) {
SPage *pPage;
u8 *ptr;
int size;
ASSERT(TDB_IS_PGSIZE_VLD(pageSize));
*ppPage = NULL;
size = pageSize + sizeof(*pPage);
ptr = (u8 *)((*xMalloc)(arg, size));
if (pPage == NULL) {
return -1;
}
memset(ptr, 0, size);
pPage = (SPage *)(ptr + pageSize);
pPage->pData = ptr;
pPage->pageSize = pageSize;
if (pageSize < 65536) {
pPage->szOffset = 2;
pPage->szPageHdr = sizeof(SPageHdr);
pPage->szFreeCell = sizeof(SFreeCell);
} else {
pPage->szOffset = 3;
pPage->szPageHdr = sizeof(SPageHdrL);
pPage->szFreeCell = sizeof(SFreeCellL);
}
TDB_INIT_PAGE_LOCK(pPage);
/* TODO */
*ppPage = pPage;
return 0;
}
int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg) {
u8 *ptr;
ptr = pPage->pData;
(*xFree)(arg, ptr);
return 0;
}
int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell) {
int ret;
SCell *pTarget;
u8 *pTmp;
int j;
if (pPage->nOverflow || szCell + pPage->szOffset > pPage->nFree) {
// TODO: need to figure out if pCell may be used by outside of this function
j = pPage->nOverflow++;
pPage->apOvfl[j] = pCell;
pPage->aiOvfl[j] = idx;
} else {
ret = tdbPageAllocate(pPage, szCell, &pTarget);
if (ret < 0) {
return -1;
}
memcpy(pTarget, pCell, szCell);
pTmp = pPage->pCellIdx + idx * pPage->szOffset;
memmove(pTmp + pPage->szOffset, pTmp, pPage->pFreeStart - pTmp - pPage->szOffset);
TDB_PAGE_CELL_OFFSET_AT_SET(pPage, idx, pTarget - pPage->pData);
TDB_PAGE_NCELLS_SET(pPage, TDB_PAGE_NCELLS(pPage) + 1);
}
return 0;
}
int tdbPageDropCell(SPage *pPage, int idx) {
// TODO
return 0;
}
static int tdbPageAllocate(SPage *pPage, int size, SCell **ppCell) {
SCell *pCell;
SFreeCell *pFreeCell;
u8 *pOffset;
int ret;
ASSERT(pPage->nFree > size + pPage->szOffset);
pCell = NULL;
*ppCell = NULL;
// 1. Try to allocate from the free space area
if (pPage->pFreeEnd - pPage->pFreeStart > size + pPage->szOffset) {
pPage->pFreeEnd -= size;
pPage->pFreeStart += pPage->szOffset;
pCell = pPage->pFreeEnd;
}
// 2. Try to allocate from the page free list
if ((pCell == NULL) && (pPage->pFreeEnd - pPage->pFreeStart >= pPage->szOffset) && TDB_PAGE_FCELL(pPage)) {
int szCell;
int nxOffset;
pCell = pPage->pData + TDB_PAGE_FCELL(pPage);
pOffset = TDB_IS_LARGE_PAGE(pPage) ? ((SPageHdrL *)(pPage->pPageHdr))[0].fCell
: (u8 *)&(((SPageHdr *)(pPage->pPageHdr))[0].fCell);
szCell = TDB_PAGE_FREE_CELL_SIZE(pPage, pCell);
nxOffset = TDB_PAGE_FREE_CELL_NXOFFSET(pPage, pCell);
for (;;) {
// Find a cell
if (szCell >= size) {
if (szCell - size >= pPage->szFreeCell) {
SCell *pTmpCell = pCell + size;
TDB_PAGE_FREE_CELL_SIZE_SET(pPage, pTmpCell, szCell - size);
TDB_PAGE_FREE_CELL_NXOFFSET_SET(pPage, pTmpCell, nxOffset);
// TODO: *pOffset = pTmpCell - pPage->pData;
} else {
TDB_PAGE_NFREE_SET(pPage, TDB_PAGE_NFREE(pPage) + szCell - size);
// TODO: *pOffset = nxOffset;
}
break;
}
// Not find a cell yet
if (nxOffset > 0) {
pCell = pPage->pData + nxOffset;
pOffset = TDB_PAGE_FREE_CELL_NXOFFSET_PTR(pPage, pCell);
szCell = TDB_PAGE_FREE_CELL_SIZE(pPage, pCell);
nxOffset = TDB_PAGE_FREE_CELL_NXOFFSET(pPage, pCell);
continue;
} else {
pCell = NULL;
break;
}
}
if (pCell) {
pPage->pFreeStart = pPage->pFreeStart + pPage->szOffset;
}
}
// 3. Try to dfragment and allocate again
if (pCell == NULL) {
ret = tdbPageDefragment(pPage);
if (ret < 0) {
return -1;
}
ASSERT(pPage->pFreeEnd - pPage->pFreeStart > size + pPage->szOffset);
ASSERT(pPage->nFree == pPage->pFreeEnd - pPage->pFreeStart);
// Allocate from the free space area again
pPage->pFreeEnd -= size;
pPage->pFreeStart += pPage->szOffset;
pCell = pPage->pFreeEnd;
}
ASSERT(pCell != NULL);
pPage->nFree = pPage->nFree - size - pPage->szOffset;
*ppCell = pCell;
return 0;
}
static int tdbPageFree(SPage *pPage, int idx, SCell *pCell, int size) {
// TODO
return 0;
}
static int tdbPageDefragment(SPage *pPage) {
// TODO
ASSERT(0);
return 0;
}
\ No newline at end of file
......@@ -60,7 +60,7 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
zsize = sizeof(*pPager) /* SPager */
+ fsize + 1 /* dbFileName */
+ fsize + 8 + 1; /* jFileName */
pPtr = (uint8_t *)taosMemoryCalloc(1, zsize);
pPtr = (uint8_t *)calloc(1, zsize);
if (pPtr == NULL) {
return -1;
}
......@@ -255,6 +255,10 @@ int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage
return 0;
}
void tdbPagerReturnPage(SPager *pPager, SPage *pPage) {
tdbPCacheRelease(pPager->pCache, pPage);
}
static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno) {
// TODO: Allocate a page from the free list
return 0;
......
......@@ -20,10 +20,15 @@
extern "C" {
#endif
typedef struct SBTree SBTree;
typedef struct SBtCursor SBtCursor;
typedef struct SBTree SBTree;
typedef struct SBTC SBTC;
typedef struct SBtInfo {
SPgno root;
int nLevel;
int nData;
} SBtInfo;
struct SBtCursor {
struct SBTC {
SBTree *pBt;
i8 iPage;
SPage *pPage;
......@@ -33,10 +38,19 @@ struct SBtCursor {
void *pBuf;
};
// SBTree
int tdbBtreeOpen(int keyLen, int valLen, SPager *pFile, FKeyComparator kcmpr, SBTree **ppBt);
int tdbBtreeClose(SBTree *pBt);
int tdbBtreeCursor(SBtCursor *pCur, SBTree *pBt);
int tdbBtCursorInsert(SBtCursor *pCur, const void *pKey, int kLen, const void *pVal, int vLen);
int tdbBtCursorInsert(SBTC *pCur, const void *pKey, int kLen, const void *pVal, int vLen);
int tdbBtreeGet(SBTree *pBt, const void *pKey, int kLen, void **ppVal, int *vLen);
// SBTC
int tdbBtcOpen(SBTC *pCur, SBTree *pBt);
int tdbBtcMoveToFirst(SBTC *pBtc);
int tdbBtcMoveToLast(SBTC *pBtc);
int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen);
int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen);
int tdbBtcClose(SBTC *pBtc);
#ifdef __cplusplus
}
......
......@@ -20,12 +20,20 @@
extern "C" {
#endif
typedef struct STDb STDb;
typedef struct STDB STDB;
typedef struct STDBC STDBC;
int tdbDbOpen(const char *fname, int keyLen, int valLen, FKeyComparator keyCmprFn, STEnv *pEnv, STDb **ppDb);
int tdbDbClose(STDb *pDb);
int tdbDbDrop(STDb *pDb);
int tdbDbInsert(STDb *pDb, const void *pKey, int keyLen, const void *pVal, int valLen);
// STDB
int tdbDbOpen(const char *fname, int keyLen, int valLen, FKeyComparator keyCmprFn, STEnv *pEnv, STDB **ppDb);
int tdbDbClose(STDB *pDb);
int tdbDbDrop(STDB *pDb);
int tdbDbInsert(STDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen);
int tdbDbGet(STDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen);
// STDBC
int tdbDbcOpen(STDB *pDb, STDBC **ppDbc);
int tdbDbNext(STDBC *pDbc, void **ppKey, int *kLen, void **ppVal, int *vLen);
int tdbDbcClose(STDBC *pDbc);
#ifdef __cplusplus
}
......
......@@ -95,7 +95,7 @@ static FORCE_INLINE int tdbCmprPgId(const void *p1, const void *p2) {
// tdb_log
#define tdbError(var)
typedef TD_DLIST(STDb) STDbList;
typedef TD_DLIST(STDB) STDbList;
typedef TD_DLIST(SPgFile) SPgFileList;
typedef TD_DLIST_NODE(SPgFile) SPgFileListNode;
......@@ -141,8 +141,8 @@ typedef int (*FKeyComparator)(const void *pKey1, int kLen1, const void *pKey2, i
#define TDB_FLAG_IS(flags, flag) ((flags) == (flag))
#define TDB_FLAG_HAS(flags, flag) (((flags) & (flag)) != 0)
#define TDB_FLAG_NO(flags, flag) ((flags) & (flag) == 0)
#define TDB_FLAG_ADD(flags, flag) ((flags) |= (flag))
#define TDB_FLAG_REMOVE(flags, flag) ((flags) &= (~(flag)))
#define TDB_FLAG_ADD(flags, flag) ((flags) | (flag))
#define TDB_FLAG_REMOVE(flags, flag) ((flags) & (~(flag)))
typedef struct SPager SPager;
typedef struct SPCache SPCache;
......
......@@ -21,23 +21,22 @@ extern "C" {
#endif
#define TDB_PCACHE_PAGE \
u8 isAnchor; \
u8 isLocalPage; \
u8 isDirty; \
i32 nRef; \
SPCache *pCache; \
SPage *pFreeNext; \
SPage *pHashNext; \
SPage *pLruNext; \
SPage *pLruPrev; \
SPage *pDirtyNext; \
SPager *pPager; \
SPgid pgid;
u8 isAnchor; \
u8 isLocalPage; \
u8 isDirty; \
i32 nRef; \
SPage *pFreeNext; \
SPage *pHashNext; \
SPage *pLruNext; \
SPage *pLruPrev; \
SPage *pDirtyNext; \
SPager *pPager; \
SPgid pgid;
int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache);
int tdbPCacheClose(SPCache *pCache);
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, bool alcNewPage);
void tdbPCacheRelease(SPage *pPage);
void tdbPCacheRelease(SPCache *pCache, SPage *pPage);
int tdbPCacheGetPageSize(SPCache *pCache);
#ifdef __cplusplus
......
......@@ -27,9 +27,6 @@ typedef struct {
int szOffset;
int szPageHdr;
int szFreeCell;
// flags
u16 (*getFlags)(SPage *);
void (*setFlags)(SPage *, u16);
// cell number
int (*getCellNum)(SPage *);
void (*setCellNum)(SPage *, int);
......@@ -45,6 +42,9 @@ typedef struct {
// cell offset at idx
int (*getCellOffset)(SPage *, int);
void (*setCellOffset)(SPage *, int, int);
// free cell info
void (*getFreeCellInfo)(SCell *pCell, int *szCell, int *nxOffset);
void (*setFreeCellInfo)(SCell *pCell, int szCell, int nxOffset);
} SPageMethods;
// Page footer
......@@ -53,58 +53,37 @@ typedef struct __attribute__((__packed__)) {
} SPageFtr;
struct SPage {
TdThreadSpinlock lock;
u8 *pData;
pthread_spinlock_t lock;
int pageSize;
u8 *pData;
SPageMethods *pPageMethods;
// Fields below used by pager and am
u8 szAmHdr;
u8 *pPageHdr;
u8 *pAmHdr;
u8 *pCellIdx;
u8 *pFreeStart;
u8 *pFreeEnd;
SPageFtr *pPageFtr;
int nOverflow;
SCell *apOvfl[4];
int aiOvfl[4];
int kLen; // key length of the page, -1 for unknown
int vLen; // value length of the page, -1 for unknown
int nFree;
int maxLocal;
int minLocal;
int nOverflow;
SCell *apOvfl[4];
int aiOvfl[4];
int (*xCellSize)(const SPage *, SCell *);
// Fields used by SPCache
TDB_PCACHE_PAGE
};
/* For page */
#define TDB_PAGE_FLAGS(pPage) (*(pPage)->pPageMethods->getFlags)(pPage)
#define TDB_PAGE_NCELLS(pPage) (*(pPage)->pPageMethods->getCellNum)(pPage)
#define TDB_PAGE_CCELLS(pPage) (*(pPage)->pPageMethods->getCellBody)(pPage)
#define TDB_PAGE_FCELL(pPage) (*(pPage)->pPageMethods->getCellFree)(pPage)
#define TDB_PAGE_NFREE(pPage) (*(pPage)->pPageMethods->getFreeBytes)(pPage)
#define TDB_PAGE_CELL_OFFSET_AT(pPage, idx) (*(pPage)->pPageMethods->getCellOffset)(pPage, idx)
#define TDB_PAGE_FLAGS_SET(pPage, FLAGS) (*(pPage)->pPageMethods->setFlags)(pPage, FLAGS)
#define TDB_PAGE_NCELLS_SET(pPage, NCELLS) (*(pPage)->pPageMethods->setCellNum)(pPage, NCELLS)
#define TDB_PAGE_CCELLS_SET(pPage, CCELLS) (*(pPage)->pPageMethods->setCellBody)(pPage, CCELLS)
#define TDB_PAGE_FCELL_SET(pPage, FCELL) (*(pPage)->pPageMethods->setCellFree)(pPage, FCELL)
#define TDB_PAGE_NFREE_SET(pPage, NFREE) (*(pPage)->pPageMethods->setFreeBytes)(pPage, NFREE)
#define TDB_PAGE_CELL_OFFSET_AT_SET(pPage, idx, OFFSET) (*(pPage)->pPageMethods->setCellOffset)(pPage, idx, OFFSET)
#define TDB_PAGE_OFFSET_SIZE(pPage) ((pPage)->pPageMethods->szOffset)
#define TDB_PAGE_CELL_AT(pPage, idx) ((pPage)->pData + TDB_PAGE_CELL_OFFSET_AT(pPage, idx))
// For page lock
#define P_LOCK_SUCC 0
#define P_LOCK_BUSY 1
#define P_LOCK_FAIL -1
#define TDB_INIT_PAGE_LOCK(pPage) taosThreadSpinInit(&((pPage)->lock), 0)
#define TDB_DESTROY_PAGE_LOCK(pPage) taosThreadSpinDestroy(&((pPage)->lock))
#define TDB_LOCK_PAGE(pPage) taosThreadSpinLock(&((pPage)->lock))
#define TDB_UNLOCK_PAGE(pPage) taosThreadSpinUnlock(&((pPage)->lock))
#define TDB_INIT_PAGE_LOCK(pPage) pthread_spin_init(&((pPage)->lock), 0)
#define TDB_DESTROY_PAGE_LOCK(pPage) pthread_spin_destroy(&((pPage)->lock))
#define TDB_LOCK_PAGE(pPage) pthread_spin_lock(&((pPage)->lock))
#define TDB_UNLOCK_PAGE(pPage) pthread_spin_unlock(&((pPage)->lock))
#define TDB_TRY_LOCK_PAGE(pPage) \
({ \
int ret; \
......@@ -119,10 +98,43 @@ struct SPage {
})
// APIs
int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t), void *arg);
int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg);
int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell);
int tdbPageDropCell(SPage *pPage, int idx);
#define TDB_PAGE_TOTAL_CELLS(pPage) ((pPage)->nOverflow + (pPage)->pPageMethods->getCellNum(pPage))
#define TDB_PAGE_USABLE_SIZE(pPage) ((u8 *)(pPage)->pPageFtr - (pPage)->pCellIdx)
#define TDB_PAGE_PGNO(pPage) ((pPage)->pgid.pgno)
#define TDB_BYTES_CELL_TAKEN(pPage, pCell) ((*(pPage)->xCellSize)(pPage, pCell) + (pPage)->pPageMethods->szOffset)
#define TDB_PAGE_OFFSET_SIZE(pPage) ((pPage)->pPageMethods->szOffset)
int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t), void *arg);
int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg);
void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *));
void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *));
int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl);
int tdbPageDropCell(SPage *pPage, int idx);
void tdbPageCopy(SPage *pFromPage, SPage *pToPage);
static inline SCell *tdbPageGetCell(SPage *pPage, int idx) {
SCell *pCell;
int iOvfl;
int lidx;
ASSERT(idx >= 0 && idx < TDB_PAGE_TOTAL_CELLS(pPage));
iOvfl = 0;
for (; iOvfl < pPage->nOverflow; iOvfl++) {
if (pPage->aiOvfl[iOvfl] == idx) {
pCell = pPage->apOvfl[iOvfl];
return pCell;
} else if (pPage->aiOvfl[iOvfl] > idx) {
break;
}
}
lidx = idx - iOvfl;
ASSERT(lidx >= 0 && lidx < pPage->pPageMethods->getCellNum(pPage));
pCell = pPage->pData + pPage->pPageMethods->getCellOffset(pPage, lidx);
return pCell;
}
#ifdef __cplusplus
}
......
......@@ -20,15 +20,16 @@
extern "C" {
#endif
int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager);
int tdbPagerClose(SPager *pPager);
int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate);
int tdbPagerWrite(SPager *pPager, SPage *pPage);
int tdbPagerBegin(SPager *pPager);
int tdbPagerCommit(SPager *pPager);
int tdbPagerGetPageSize(SPager *pPager);
int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg);
int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg);
int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager);
int tdbPagerClose(SPager *pPager);
int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate);
int tdbPagerWrite(SPager *pPager, SPage *pPage);
int tdbPagerBegin(SPager *pPager);
int tdbPagerCommit(SPager *pPager);
int tdbPagerGetPageSize(SPager *pPager);
int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg);
int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg);
void tdbPagerReturnPage(SPager *pPager, SPage *pPage);
#ifdef __cplusplus
}
......
......@@ -39,6 +39,38 @@ int tdbGetFileSize(const char *fname, int pgSize, SPgno *pSize);
int tdbPRead(int fd, void *pData, int count, i64 offset);
#define TDB_REALLOC(PTR, SIZE) \
({ \
void *nPtr; \
if ((PTR) == NULL || ((int *)(PTR))[-1] < (SIZE)) { \
nPtr = realloc((PTR) ? (char *)(PTR) - sizeof(int) : NULL, (SIZE) + sizeof(int)); \
if (nPtr) { \
((int *)nPtr)[0] = (SIZE); \
nPtr = (char *)nPtr + sizeof(int); \
} \
} else { \
nPtr = (PTR); \
} \
nPtr; \
})
#define TDB_FREE(PTR) \
do { \
if (PTR) { \
free((char *)(PTR) - sizeof(int)); \
} \
} while (0)
static inline void *tdbOsMalloc(void *arg, size_t size) {
void *ptr;
ptr = malloc(size);
return ptr;
}
static inline void tdbOsFree(void *arg, void *ptr) { free(ptr); }
static inline int tdbPutVarInt(u8 *p, int v) {
int n = 0;
......
......@@ -18,13 +18,25 @@
extern SPageMethods pageMethods;
extern SPageMethods pageLargeMethods;
typedef struct __attribute__((__packed__)) {
u16 szCell;
u16 nxOffset;
} SFreeCell;
#define TDB_PAGE_HDR_SIZE(pPage) ((pPage)->pPageMethods->szPageHdr)
#define TDB_PAGE_FREE_CELL_SIZE(pPage) ((pPage)->pPageMethods->szFreeCell)
#define TDB_PAGE_NCELLS(pPage) (*(pPage)->pPageMethods->getCellNum)(pPage)
#define TDB_PAGE_CCELLS(pPage) (*(pPage)->pPageMethods->getCellBody)(pPage)
#define TDB_PAGE_FCELL(pPage) (*(pPage)->pPageMethods->getCellFree)(pPage)
#define TDB_PAGE_NFREE(pPage) (*(pPage)->pPageMethods->getFreeBytes)(pPage)
#define TDB_PAGE_CELL_OFFSET_AT(pPage, idx) (*(pPage)->pPageMethods->getCellOffset)(pPage, idx)
#define TDB_PAGE_NCELLS_SET(pPage, NCELLS) (*(pPage)->pPageMethods->setCellNum)(pPage, NCELLS)
#define TDB_PAGE_CCELLS_SET(pPage, CCELLS) (*(pPage)->pPageMethods->setCellBody)(pPage, CCELLS)
#define TDB_PAGE_FCELL_SET(pPage, FCELL) (*(pPage)->pPageMethods->setCellFree)(pPage, FCELL)
#define TDB_PAGE_NFREE_SET(pPage, NFREE) (*(pPage)->pPageMethods->setFreeBytes)(pPage, NFREE)
#define TDB_PAGE_CELL_OFFSET_AT_SET(pPage, idx, OFFSET) (*(pPage)->pPageMethods->setCellOffset)(pPage, idx, OFFSET)
#define TDB_PAGE_CELL_AT(pPage, idx) ((pPage)->pData + TDB_PAGE_CELL_OFFSET_AT(pPage, idx))
#define TDB_PAGE_MAX_FREE_BLOCK(pPage, szAmHdr) \
((pPage)->pageSize - (szAmHdr)-TDB_PAGE_HDR_SIZE(pPage) - sizeof(SPageFtr))
static int tdbPageAllocate(SPage *pPage, int size, SCell **ppCell);
static int tdbPageDefragment(SPage *pPage);
static int tdbPageFree(SPage *pPage, int idx, SCell *pCell, int szCell);
int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t), void *arg) {
SPage *pPage;
......@@ -35,25 +47,26 @@ int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t)
*ppPage = NULL;
size = pageSize + sizeof(*pPage);
if (xMalloc == NULL) {
xMalloc = tdbOsMalloc;
}
ptr = (u8 *)((*xMalloc)(arg, size));
if (pPage == NULL) {
if (ptr == NULL) {
return -1;
}
memset(ptr, 0, size);
pPage = (SPage *)(ptr + pageSize);
pPage->pData = ptr;
TDB_INIT_PAGE_LOCK(pPage);
pPage->pageSize = pageSize;
pPage->pData = ptr;
if (pageSize < 65536) {
pPage->pPageMethods = &pageMethods;
} else {
pPage->pPageMethods = &pageLargeMethods;
}
TDB_INIT_PAGE_LOCK(pPage);
/* TODO */
*ppPage = pPage;
return 0;
......@@ -62,157 +75,365 @@ int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t)
int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg) {
u8 *ptr;
if (!xFree) {
xFree = tdbOsFree;
}
ptr = pPage->pData;
(*xFree)(arg, ptr);
return 0;
}
int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell) {
int ret;
SCell *pTarget;
u8 *pTmp;
int j;
void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *)) {
pPage->pPageHdr = pPage->pData + szAmHdr;
TDB_PAGE_NCELLS_SET(pPage, 0);
TDB_PAGE_CCELLS_SET(pPage, pPage->pageSize - sizeof(SPageFtr));
TDB_PAGE_FCELL_SET(pPage, 0);
TDB_PAGE_NFREE_SET(pPage, TDB_PAGE_MAX_FREE_BLOCK(pPage, szAmHdr));
pPage->pCellIdx = pPage->pPageHdr + TDB_PAGE_HDR_SIZE(pPage);
pPage->pFreeStart = pPage->pCellIdx;
pPage->pFreeEnd = pPage->pData + TDB_PAGE_CCELLS(pPage);
pPage->pPageFtr = (SPageFtr *)(pPage->pData + pPage->pageSize - sizeof(SPageFtr));
pPage->nOverflow = 0;
pPage->xCellSize = xCellSize;
ASSERT((u8 *)pPage->pPageFtr == pPage->pFreeEnd);
}
void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *)) {
pPage->pPageHdr = pPage->pData + szAmHdr;
pPage->pCellIdx = pPage->pPageHdr + TDB_PAGE_HDR_SIZE(pPage);
pPage->pFreeStart = pPage->pCellIdx + TDB_PAGE_OFFSET_SIZE(pPage) * TDB_PAGE_NCELLS(pPage);
pPage->pFreeEnd = pPage->pData + TDB_PAGE_CCELLS(pPage);
pPage->pPageFtr = (SPageFtr *)(pPage->pData + pPage->pageSize - sizeof(SPageFtr));
pPage->nOverflow = 0;
pPage->xCellSize = xCellSize;
ASSERT(pPage->pFreeEnd >= pPage->pFreeStart);
ASSERT(pPage->pFreeEnd - pPage->pFreeStart <= TDB_PAGE_NFREE(pPage));
}
if (pPage->nOverflow || szCell + TDB_PAGE_OFFSET_SIZE(pPage) > pPage->nFree) {
// TODO: need to figure out if pCell may be used by outside of this function
j = pPage->nOverflow++;
int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl) {
int nFree;
int nCells;
int iOvfl;
int lidx; // local idx
SCell *pNewCell;
pPage->apOvfl[j] = pCell;
pPage->aiOvfl[j] = idx;
} else {
ret = tdbPageAllocate(pPage, szCell, &pTarget);
if (ret < 0) {
return -1;
ASSERT(szCell <= TDB_PAGE_MAX_FREE_BLOCK(pPage, pPage->pPageHdr - pPage->pData));
nFree = TDB_PAGE_NFREE(pPage);
nCells = TDB_PAGE_NCELLS(pPage);
iOvfl = 0;
for (; iOvfl < pPage->nOverflow; iOvfl++) {
if (pPage->aiOvfl[iOvfl] >= idx) {
break;
}
}
lidx = idx - iOvfl;
if (asOvfl || nFree < szCell + TDB_PAGE_OFFSET_SIZE(pPage)) {
// TODO: make it extensible
// add the cell as an overflow cell
for (int i = pPage->nOverflow; i > iOvfl; i--) {
pPage->apOvfl[i] = pPage->apOvfl[i - 1];
pPage->aiOvfl[i] = pPage->aiOvfl[i - 1];
}
memcpy(pTarget, pCell, szCell);
pTmp = pPage->pCellIdx + idx * TDB_PAGE_OFFSET_SIZE(pPage);
memmove(pTmp + TDB_PAGE_OFFSET_SIZE(pPage), pTmp, pPage->pFreeStart - pTmp - TDB_PAGE_OFFSET_SIZE(pPage));
TDB_PAGE_CELL_OFFSET_AT_SET(pPage, idx, pTarget - pPage->pData);
TDB_PAGE_NCELLS_SET(pPage, TDB_PAGE_NCELLS(pPage) + 1);
// TODO: here has memory leak
pNewCell = (SCell *)malloc(szCell);
memcpy(pNewCell, pCell, szCell);
pPage->apOvfl[iOvfl] = pNewCell;
pPage->aiOvfl[iOvfl] = idx;
pPage->nOverflow++;
iOvfl++;
} else {
// page must has enough space to hold the cell locally
tdbPageAllocate(pPage, szCell, &pNewCell);
memcpy(pNewCell, pCell, szCell);
// no overflow cell exists in this page
u8 *src = pPage->pCellIdx + TDB_PAGE_OFFSET_SIZE(pPage) * lidx;
u8 *dest = src + TDB_PAGE_OFFSET_SIZE(pPage);
memmove(dest, src, pPage->pFreeStart - dest);
TDB_PAGE_CELL_OFFSET_AT_SET(pPage, lidx, pNewCell - pPage->pData);
TDB_PAGE_NCELLS_SET(pPage, nCells + 1);
ASSERT(pPage->pFreeStart == pPage->pCellIdx + TDB_PAGE_OFFSET_SIZE(pPage) * (nCells + 1));
}
for (; iOvfl < pPage->nOverflow; iOvfl++) {
pPage->aiOvfl[iOvfl]++;
}
return 0;
}
int tdbPageDropCell(SPage *pPage, int idx) {
// TODO
int lidx;
SCell *pCell;
int szCell;
int nCells;
int iOvfl;
nCells = TDB_PAGE_NCELLS(pPage);
ASSERT(idx >= 0 && idx < nCells + pPage->nOverflow);
iOvfl = 0;
for (; iOvfl < pPage->nOverflow; iOvfl++) {
if (pPage->aiOvfl[iOvfl] == idx) {
// remove the over flow cell
for (; (++iOvfl) < pPage->nOverflow;) {
pPage->aiOvfl[iOvfl - 1] = pPage->aiOvfl[iOvfl] - 1;
pPage->apOvfl[iOvfl - 1] = pPage->apOvfl[iOvfl];
}
pPage->nOverflow--;
return 0;
} else if (pPage->aiOvfl[iOvfl] > idx) {
break;
}
}
lidx = idx - iOvfl;
pCell = TDB_PAGE_CELL_AT(pPage, lidx);
szCell = (*pPage->xCellSize)(pPage, pCell);
tdbPageFree(pPage, lidx, pCell, szCell);
TDB_PAGE_NCELLS_SET(pPage, nCells - 1);
for (; iOvfl < pPage->nOverflow; iOvfl++) {
pPage->aiOvfl[iOvfl]--;
ASSERT(pPage->aiOvfl[iOvfl] > 0);
}
return 0;
}
static int tdbPageAllocate(SPage *pPage, int size, SCell **ppCell) {
SCell *pCell;
SFreeCell *pFreeCell;
u8 *pOffset;
int ret;
void tdbPageCopy(SPage *pFromPage, SPage *pToPage) {
int delta, nFree;
ASSERT(pPage->nFree > size + TDB_PAGE_OFFSET_SIZE(pPage));
pToPage->pFreeStart = pToPage->pPageHdr + (pFromPage->pFreeStart - pFromPage->pPageHdr);
pToPage->pFreeEnd = (u8 *)(pToPage->pPageFtr) - ((u8 *)pFromPage->pPageFtr - pFromPage->pFreeEnd);
ASSERT(pToPage->pFreeEnd >= pToPage->pFreeStart);
memcpy(pToPage->pPageHdr, pFromPage->pPageHdr, pFromPage->pFreeStart - pFromPage->pPageHdr);
memcpy(pToPage->pFreeEnd, pFromPage->pFreeEnd, (u8 *)pFromPage->pPageFtr - pFromPage->pFreeEnd);
ASSERT(TDB_PAGE_CCELLS(pToPage) == pToPage->pFreeEnd - pToPage->pData);
delta = (pToPage->pPageHdr - pToPage->pData) - (pFromPage->pPageHdr - pFromPage->pData);
if (delta != 0) {
nFree = TDB_PAGE_NFREE(pFromPage);
TDB_PAGE_NFREE_SET(pToPage, nFree - delta);
}
// Copy the overflow cells
for (int iOvfl = 0; iOvfl < pFromPage->nOverflow; iOvfl++) {
pToPage->aiOvfl[iOvfl] = pFromPage->aiOvfl[iOvfl];
pToPage->apOvfl[iOvfl] = pFromPage->apOvfl[iOvfl];
}
pToPage->nOverflow = pFromPage->nOverflow;
}
static int tdbPageAllocate(SPage *pPage, int szCell, SCell **ppCell) {
SCell *pFreeCell;
u8 *pOffset;
int nFree;
int ret;
int cellFree;
SCell *pCell = NULL;
pCell = NULL;
*ppCell = NULL;
nFree = TDB_PAGE_NFREE(pPage);
// 1. Try to allocate from the free space area
if (pPage->pFreeEnd - pPage->pFreeStart > size + TDB_PAGE_OFFSET_SIZE(pPage)) {
pPage->pFreeEnd -= size;
pPage->pFreeStart += TDB_PAGE_OFFSET_SIZE(pPage);
ASSERT(nFree >= szCell + TDB_PAGE_OFFSET_SIZE(pPage));
ASSERT(TDB_PAGE_CCELLS(pPage) == pPage->pFreeEnd - pPage->pData);
// 1. Try to allocate from the free space block area
if (pPage->pFreeEnd - pPage->pFreeStart >= szCell + TDB_PAGE_OFFSET_SIZE(pPage)) {
pPage->pFreeEnd -= szCell;
pCell = pPage->pFreeEnd;
TDB_PAGE_CCELLS_SET(pPage, pPage->pFreeEnd - pPage->pData);
goto _alloc_finish;
}
// 2. Try to allocate from the page free list
if ((pCell == NULL) && (pPage->pFreeEnd - pPage->pFreeStart >= TDB_PAGE_OFFSET_SIZE(pPage)) &&
TDB_PAGE_FCELL(pPage)) {
#if 0
int szCell;
int nxOffset;
pCell = pPage->pData + TDB_PAGE_FCELL(pPage);
pOffset = TDB_IS_LARGE_PAGE(pPage) ? ((SPageHdrL *)(pPage->pPageHdr))[0].fCell
: (u8 *)&(((SPageHdr *)(pPage->pPageHdr))[0].fCell);
szCell = TDB_PAGE_FREE_CELL_SIZE(pPage, pCell);
nxOffset = TDB_PAGE_FREE_CELL_NXOFFSET(pPage, pCell);
cellFree = TDB_PAGE_FCELL(pPage);
ASSERT(cellFree == 0 || cellFree > pPage->pFreeEnd - pPage->pData);
if (cellFree && pPage->pFreeEnd - pPage->pFreeStart >= TDB_PAGE_OFFSET_SIZE(pPage)) {
SCell *pPrevFreeCell = NULL;
int szPrevFreeCell;
int szFreeCell;
int nxFreeCell;
int newSize;
for (;;) {
// Find a cell
if (szCell >= size) {
if (szCell - size >= pPage->szFreeCell) {
SCell *pTmpCell = pCell + size;
TDB_PAGE_FREE_CELL_SIZE_SET(pPage, pTmpCell, szCell - size);
TDB_PAGE_FREE_CELL_NXOFFSET_SET(pPage, pTmpCell, nxOffset);
// TODO: *pOffset = pTmpCell - pPage->pData;
if (cellFree == 0) break;
pFreeCell = pPage->pData + cellFree;
pPage->pPageMethods->getFreeCellInfo(pFreeCell, &szFreeCell, &nxFreeCell);
if (szFreeCell >= szCell) {
pCell = pFreeCell;
newSize = szFreeCell - szCell;
pFreeCell += szCell;
if (newSize >= TDB_PAGE_FREE_CELL_SIZE(pPage)) {
pPage->pPageMethods->setFreeCellInfo(pFreeCell, newSize, nxFreeCell);
if (pPrevFreeCell) {
pPage->pPageMethods->setFreeCellInfo(pPrevFreeCell, szPrevFreeCell, pFreeCell - pPage->pData);
} else {
TDB_PAGE_FCELL_SET(pPage, pFreeCell - pPage->pData);
}
} else {
TDB_PAGE_NFREE_SET(pPage, TDB_PAGE_NFREE(pPage) + szCell - size);
// TODO: *pOffset = nxOffset;
if (pPrevFreeCell) {
pPage->pPageMethods->setFreeCellInfo(pPrevFreeCell, szPrevFreeCell, nxFreeCell);
} else {
TDB_PAGE_FCELL_SET(pPage, nxFreeCell);
}
}
break;
}
// Not find a cell yet
if (nxOffset > 0) {
pCell = pPage->pData + nxOffset;
pOffset = TDB_PAGE_FREE_CELL_NXOFFSET_PTR(pPage, pCell);
szCell = TDB_PAGE_FREE_CELL_SIZE(pPage, pCell);
nxOffset = TDB_PAGE_FREE_CELL_NXOFFSET(pPage, pCell);
continue;
goto _alloc_finish;
} else {
pCell = NULL;
break;
pPrevFreeCell = pFreeCell;
szPrevFreeCell = szFreeCell;
cellFree = nxFreeCell;
}
}
if (pCell) {
pPage->pFreeStart = pPage->pFreeStart + pPage->szOffset;
}
#endif
}
// 3. Try to dfragment and allocate again
if (pCell == NULL) {
ret = tdbPageDefragment(pPage);
if (ret < 0) {
return -1;
}
tdbPageDefragment(pPage);
ASSERT(pPage->pFreeEnd - pPage->pFreeStart == nFree);
ASSERT(nFree == TDB_PAGE_NFREE(pPage));
ASSERT(pPage->pFreeEnd - pPage->pData == TDB_PAGE_CCELLS(pPage));
pPage->pFreeEnd -= szCell;
pCell = pPage->pFreeEnd;
TDB_PAGE_CCELLS_SET(pPage, pPage->pFreeEnd - pPage->pData);
_alloc_finish:
ASSERT(pCell);
pPage->pFreeStart += TDB_PAGE_OFFSET_SIZE(pPage);
TDB_PAGE_NFREE_SET(pPage, nFree - szCell - TDB_PAGE_OFFSET_SIZE(pPage));
*ppCell = pCell;
return 0;
}
ASSERT(pPage->pFreeEnd - pPage->pFreeStart > size + TDB_PAGE_OFFSET_SIZE(pPage));
ASSERT(pPage->nFree == pPage->pFreeEnd - pPage->pFreeStart);
static int tdbPageFree(SPage *pPage, int idx, SCell *pCell, int szCell) {
int nFree;
int cellFree;
u8 *dest;
u8 *src;
// Allocate from the free space area again
pPage->pFreeEnd -= size;
pPage->pFreeStart += TDB_PAGE_OFFSET_SIZE(pPage);
pCell = pPage->pFreeEnd;
}
ASSERT(pCell >= pPage->pFreeEnd);
ASSERT(pCell + szCell <= (u8 *)(pPage->pPageFtr));
ASSERT(pCell == TDB_PAGE_CELL_AT(pPage, idx));
ASSERT(pCell != NULL);
nFree = TDB_PAGE_NFREE(pPage);
pPage->nFree = pPage->nFree - size - TDB_PAGE_OFFSET_SIZE(pPage);
*ppCell = pCell;
return 0;
}
if (pCell == pPage->pFreeEnd) {
pPage->pFreeEnd += szCell;
TDB_PAGE_CCELLS_SET(pPage, pPage->pFreeEnd - pPage->pData);
} else {
if (szCell >= TDB_PAGE_FREE_CELL_SIZE(pPage)) {
cellFree = TDB_PAGE_FCELL(pPage);
pPage->pPageMethods->setFreeCellInfo(pCell, szCell, cellFree);
TDB_PAGE_FCELL_SET(pPage, pCell - pPage->pData);
} else {
ASSERT(0);
}
}
dest = pPage->pCellIdx + TDB_PAGE_OFFSET_SIZE(pPage) * idx;
src = dest + TDB_PAGE_OFFSET_SIZE(pPage);
memmove(dest, src, pPage->pFreeStart - src);
static int tdbPageFree(SPage *pPage, int idx, SCell *pCell, int size) {
// TODO
pPage->pFreeStart -= TDB_PAGE_OFFSET_SIZE(pPage);
nFree = nFree + szCell + TDB_PAGE_OFFSET_SIZE(pPage);
TDB_PAGE_NFREE_SET(pPage, nFree);
return 0;
}
static int tdbPageDefragment(SPage *pPage) {
// TODO
ASSERT(0);
int nFree;
int nCells;
SCell *pCell;
SCell *pNextCell;
SCell *pTCell;
int szCell;
int idx;
int iCell;
ASSERT(pPage->pFreeEnd - pPage->pFreeStart < nFree);
nFree = TDB_PAGE_NFREE(pPage);
nCells = TDB_PAGE_NCELLS(pPage);
// Loop to compact the page content
// Here we use an O(n^2) algorithm to do the job since
// this is a low frequency job.
pNextCell = (u8 *)pPage->pPageFtr;
pCell = NULL;
for (iCell = 0;; iCell++) {
// compact over
if (iCell == nCells) {
pPage->pFreeEnd = pNextCell;
break;
}
for (int i = 0; i < nCells; i++) {
if (TDB_PAGE_CELL_OFFSET_AT(pPage, i) < pNextCell - pPage->pData) {
pTCell = TDB_PAGE_CELL_AT(pPage, i);
if (pCell == NULL || pCell < pTCell) {
pCell = pTCell;
idx = i;
}
} else {
continue;
}
}
ASSERT(pCell != NULL);
szCell = (*pPage->xCellSize)(pPage, pCell);
ASSERT(pCell + szCell <= pNextCell);
if (pCell + szCell < pNextCell) {
memmove(pNextCell - szCell, pCell, szCell);
}
pCell = NULL;
pNextCell = pNextCell - szCell;
TDB_PAGE_CELL_OFFSET_AT_SET(pPage, idx, pNextCell - pPage->pData);
}
ASSERT(pPage->pFreeEnd - pPage->pFreeStart == nFree);
TDB_PAGE_CCELLS_SET(pPage, pPage->pFreeEnd - pPage->pData);
TDB_PAGE_FCELL_SET(pPage, 0);
return 0;
}
/* ---------------------------------------------------------------------------------------------------------- */
typedef struct __attribute__((__packed__)) {
u16 flags;
u16 cellNum;
u16 cellBody;
u16 cellFree;
u16 nFree;
} SPageHdr;
// flags
static inline u16 getPageFlags(SPage *pPage) { return ((SPageHdr *)(pPage->pPageHdr))[0].flags; }
static inline void setPageFlags(SPage *pPage, u16 flags) { ((SPageHdr *)(pPage->pPageHdr))[0].flags = flags; }
typedef struct __attribute__((__packed__)) {
u16 szCell;
u16 nxOffset;
} SFreeCell;
// cellNum
static inline int getPageCellNum(SPage *pPage) { return ((SPageHdr *)(pPage->pPageHdr))[0].cellNum; }
......@@ -253,20 +474,33 @@ static inline void setPageCellOffset(SPage *pPage, int idx, int offset) {
((u16 *)pPage->pCellIdx)[idx] = (u16)offset;
}
// free cell info
static inline void getPageFreeCellInfo(SCell *pCell, int *szCell, int *nxOffset) {
SFreeCell *pFreeCell = (SFreeCell *)pCell;
*szCell = pFreeCell->szCell;
*nxOffset = pFreeCell->nxOffset;
}
static inline void setPageFreeCellInfo(SCell *pCell, int szCell, int nxOffset) {
SFreeCell *pFreeCell = (SFreeCell *)pCell;
pFreeCell->szCell = szCell;
pFreeCell->nxOffset = nxOffset;
}
SPageMethods pageMethods = {
2, // szOffset
sizeof(SPageHdr), // szPageHdr
sizeof(SFreeCell), // szFreeCell
getPageFlags, // getPageFlags
setPageFlags, // setFlagsp
getPageCellNum, // getCellNum
setPageCellNum, // setCellNum
getPageCellBody, // getCellBody
setPageCellBody, // setCellBody
getPageCellFree, // getCellFree
setPageCellFree, // setCellFree
getPageNFree, // getFreeBytes
setPageNFree, // setFreeBytes
getPageCellOffset, // getCellOffset
setPageCellOffset // setCellOffset
2, // szOffset
sizeof(SPageHdr), // szPageHdr
sizeof(SFreeCell), // szFreeCell
getPageCellNum, // getCellNum
setPageCellNum, // setCellNum
getPageCellBody, // getCellBody
setPageCellBody, // setCellBody
getPageCellFree, // getCellFree
setPageCellFree, // setCellFree
getPageNFree, // getFreeBytes
setPageNFree, // setFreeBytes
getPageCellOffset, // getCellOffset
setPageCellOffset, // setCellOffset
getPageFreeCellInfo, // getFreeCellInfo
setPageFreeCellInfo // setFreeCellInfo
};
\ No newline at end of file
......@@ -16,11 +16,10 @@
#include "tdbInt.h"
typedef struct __attribute__((__packed__)) {
u16 flags;
u8 cellNum[3];
u8 cellBody[3];
u8 cellFree[3];
u8 nFree[3];
u8 cellNum[3];
u8 cellBody[3];
u8 cellFree[3];
u8 nFree[3];
} SPageHdrL;
typedef struct __attribute__((__packed__)) {
......@@ -28,10 +27,6 @@ typedef struct __attribute__((__packed__)) {
u8 nxOffset[3];
} SFreeCellL;
// flags
static inline u16 getPageFlags(SPage *pPage) { return ((SPageHdrL *)(pPage->pPageHdr))[0].flags; }
static inline void setPageFlags(SPage *pPage, u16 flags) { ((SPageHdrL *)(pPage->pPageHdr))[0].flags = flags; }
// cellNum
static inline int getPageCellNum(SPage *pPage) { return TDB_GET_U24(((SPageHdrL *)(pPage->pPageHdr))[0].cellNum); }
static inline void setPageCellNum(SPage *pPage, int cellNum) {
......@@ -66,20 +61,33 @@ static inline void setPageCellOffset(SPage *pPage, int idx, int offset) {
TDB_PUT_U24(pPage->pCellIdx + 3 * idx, offset);
}
// free cell info
static inline void getPageFreeCellInfo(SCell *pCell, int *szCell, int *nxOffset) {
SFreeCellL *pFreeCell = (SFreeCellL *)pCell;
*szCell = TDB_GET_U24(pFreeCell->szCell);
*nxOffset = TDB_GET_U24(pFreeCell->nxOffset);
}
static inline void setPageFreeCellInfo(SCell *pCell, int szCell, int nxOffset) {
SFreeCellL *pFreeCell = (SFreeCellL *)pCell;
TDB_PUT_U24(pFreeCell->szCell, szCell);
TDB_PUT_U24(pFreeCell->nxOffset, nxOffset);
}
SPageMethods pageLargeMethods = {
3, // szOffset
sizeof(SPageHdrL), // szPageHdr
sizeof(SFreeCellL), // szFreeCell
getPageFlags, // getPageFlags
setPageFlags, // setFlagsp
getPageCellNum, // getCellNum
setPageCellNum, // setCellNum
getPageCellBody, // getCellBody
setPageCellBody, // setCellBody
getPageCellFree, // getCellFree
setPageCellFree, // setCellFree
getPageNFree, // getFreeBytes
setPageNFree, // setFreeBytes
getPageCellOffset, // getCellOffset
setPageCellOffset // setCellOffset
3, // szOffset
sizeof(SPageHdrL), // szPageHdr
sizeof(SFreeCellL), // szFreeCell
getPageCellNum, // getCellNum
setPageCellNum, // setCellNum
getPageCellBody, // getCellBody
setPageCellBody, // setCellBody
getPageCellFree, // getCellFree
setPageCellFree, // setCellFree
getPageNFree, // getFreeBytes
setPageNFree, // setFreeBytes
getPageCellOffset, // getCellOffset
setPageCellOffset, // setCellOffset
getPageFreeCellInfo, // getFreeCellInfo
setPageFreeCellInfo // setFreeCellInfo
};
\ No newline at end of file
......@@ -2,28 +2,190 @@
#include "tdbInt.h"
#include <string>
typedef struct SPoolMem {
int64_t size;
struct SPoolMem *prev;
struct SPoolMem *next;
} SPoolMem;
static SPoolMem *openPool() {
SPoolMem *pPool = (SPoolMem *)malloc(sizeof(*pPool));
pPool->prev = pPool->next = pPool;
pPool->size = 0;
return pPool;
}
static void closePool(SPoolMem *pPool) {
SPoolMem *pMem;
do {
pMem = pPool->next;
if (pMem == pPool) break;
pMem->next->prev = pMem->prev;
pMem->prev->next = pMem->next;
pPool->size -= pMem->size;
free(pMem);
} while (1);
assert(pPool->size == 0);
free(pPool);
}
static void *poolMalloc(void *arg, int size) {
void *ptr = NULL;
SPoolMem *pPool = (SPoolMem *)arg;
SPoolMem *pMem;
pMem = (SPoolMem *)malloc(sizeof(*pMem) + size);
if (pMem == NULL) {
assert(0);
}
pMem->size = sizeof(*pMem) + size;
pMem->next = pPool->next;
pMem->prev = pPool;
pPool->next->prev = pMem;
pPool->next = pMem;
pPool->size += pMem->size;
ptr = (void *)(&pMem[1]);
return ptr;
}
static void poolFree(void *arg, void *ptr) {
SPoolMem *pPool = (SPoolMem *)arg;
SPoolMem *pMem;
pMem = &(((SPoolMem *)ptr)[-1]);
pMem->next->prev = pMem->prev;
pMem->prev->next = pMem->next;
pPool->size -= pMem->size;
free(pMem);
}
static int tKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
int k1, k2;
std::string s1((char *)pKey1 + 3, kLen1 - 3);
std::string s2((char *)pKey2 + 3, kLen2 - 3);
k1 = stoi(s1);
k2 = stoi(s2);
if (k1 < k2) {
return -1;
} else if (k1 > k2) {
return 1;
} else {
return 0;
}
}
static int tDefaultKeyCmpr(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2) {
int mlen;
int cret;
ASSERT(keyLen1 > 0 && keyLen2 > 0 && pKey1 != NULL && pKey2 != NULL);
mlen = keyLen1 < keyLen2 ? keyLen1 : keyLen2;
cret = memcmp(pKey1, pKey2, mlen);
if (cret == 0) {
if (keyLen1 < keyLen2) {
cret = -1;
} else if (keyLen1 > keyLen2) {
cret = 1;
} else {
cret = 0;
}
}
return cret;
}
TEST(tdb_test, simple_test) {
int ret;
STEnv *pEnv;
STDb *pDb;
int ret;
STEnv *pEnv;
STDB *pDb;
FKeyComparator compFunc;
int nData = 10000000;
// Open Env
ret = tdbEnvOpen("tdb", 1024, 20, &pEnv);
ret = tdbEnvOpen("tdb", 4096, 256000, &pEnv);
GTEST_ASSERT_EQ(ret, 0);
// Create a database
ret = tdbDbOpen("db.db", TDB_VARIANT_LEN, TDB_VARIANT_LEN, NULL, pEnv, &pDb);
compFunc = tKeyCmpr;
ret = tdbDbOpen("db.db", TDB_VARIANT_LEN, TDB_VARIANT_LEN, compFunc, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
{ // Insert some data
{
char key[64];
char val[64];
for (int i = 1; i <= 1000; i++) {
sprintf(key, "key%d", i);
sprintf(val, "value%d", i);
ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val));
{ // Insert some data
for (int i = 1; i <= nData; i++) {
sprintf(key, "key%d", i);
sprintf(val, "value%d", i);
ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val));
GTEST_ASSERT_EQ(ret, 0);
}
}
{ // Query the data
void *pVal = NULL;
int vLen;
for (int i = 1; i <= nData; i++) {
sprintf(key, "key%d", i);
sprintf(val, "value%d", i);
ret = tdbDbGet(pDb, key, strlen(key), &pVal, &vLen);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(vLen, strlen(val));
GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0);
}
TDB_FREE(pVal);
}
{ // Iterate to query the DB data
STDBC *pDBC;
void *pKey = NULL;
void *pVal = NULL;
int vLen, kLen;
int count = 0;
ret = tdbDbcOpen(pDb, &pDBC);
GTEST_ASSERT_EQ(ret, 0);
for (;;) {
ret = tdbDbNext(pDBC, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) break;
// std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " ";
// std::cout.write((char *)pVal, vLen) /* << " " << vLen */;
// std::cout << std::endl;
count++;
}
GTEST_ASSERT_EQ(count, nData);
tdbDbcClose(pDBC);
TDB_FREE(pKey);
TDB_FREE(pVal);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册