提交 0ec80ff4 编写于 作者: M Minglei Jin

tdb/ofp-recycle: recycle ofps when dropOfp

上级 fc79074e
......@@ -233,8 +233,9 @@ int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn) {
int ret;
tdbBtcOpen(&btc, pBt, pTxn);
/*
btc.coder.ofps = taosArrayInit(8, sizeof(SPgno));
*/
tdbTrace("tdb delete, btc: %p, pTxn: %p", &btc, pTxn);
// move the cursor
......@@ -864,7 +865,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
((SIntHdr *)(pParent->pData))->pgno = ((SIntHdr *)(pNews[0]->pData))->pgno;
}
tdbPagerInsertFreePage(pBt->pPager, TDB_PAGE_PGNO(pNews[0]), pTxn);
tdbPagerInsertFreePage(pBt->pPager, pNews[0], pTxn);
}
for (int i = 0; i < 3; i++) {
......@@ -875,7 +876,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
for (pageIdx = 0; pageIdx < nOlds; ++pageIdx) {
if (pageIdx >= nNews) {
tdbPagerInsertFreePage(pBt->pPager, TDB_PAGE_PGNO(pOlds[pageIdx]), pTxn);
tdbPagerInsertFreePage(pBt->pPager, pOlds[pageIdx], pTxn);
}
tdbPagerReturnPage(pBt->pPager, pOlds[pageIdx], pTxn);
}
......@@ -1319,10 +1320,6 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
return -1;
}
if (pDecoder->ofps) {
taosArrayPush(pDecoder->ofps, &pgno);
}
ofpCell = tdbPageGetCell(ofp, 0);
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
......@@ -1529,8 +1526,8 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *
if (pPage->vLen == TDB_VARIANT_LEN) {
if (!leaf) {
tdbError("tdb/btree-cell-size: not a leaf page.");
return -1;
tdbError("tdb/btree-cell-size: not a leaf page:%p, pgno:%" PRIu32 ".", pPage, TDB_PAGE_PGNO(pPage));
// return -1;
}
nHeader += tdbGetVarInt(pCell + nHeader, &vLen);
} else if (leaf) {
......@@ -1570,8 +1567,27 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *
bytes = ofp->maxLocal - sizeof(SPgno);
}
SPgno origPgno = pgno;
memcpy(&pgno, ofpCell + bytes, sizeof(pgno));
ret = tdbPagerWrite(pBt->pPager, ofp);
if (ret < 0) {
tdbError("failed to write page since %s", terrstr());
return -1;
}
// tdbPageDropCell(ofp, 0, pTxn, pBt);
// tdbPageZero(ofp, sizeof(SLeafHdr), tdbBtreeCellSize);
// tdbPageZero(ofp, sizeof(SIntHdr), tdbBtreeCellSize);
// SIntHdr *pIntHdr = (SIntHdr *)(ofp->pData);
// pIntHdr->flags = TDB_FLAG_ADD(0, TDB_BTREE_OVFL);
// pIntHdr->pgno = 0;
// ofp->pPager = NULL;
tdbPagerInsertFreePage(pBt->pPager, ofp, pTxn);
// printf("tdb recycle, pTxn: %p, pgno:%u\n", pTxn, pgno);
tdbTrace("tdb recycle, pTxn: %p, pgno:%u", pTxn, origPgno);
tdbPagerReturnPage(pPage->pPager, ofp, pTxn);
nLeft -= bytes;
......@@ -1991,6 +2007,11 @@ static int tdbBtcMoveDownward(SBTC *pBtc) {
return -1;
}
if (TDB_BTREE_PAGE_IS_OVFL(pBtc->pPage)) {
tdbError("tdb/btc-move-downward: should not be a ovfl page here.");
return -1;
}
if (pBtc->idx < TDB_PAGE_TOTAL_CELLS(pBtc->pPage)) {
pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);
pgno = ((SPgno *)pCell)[0];
......@@ -2081,14 +2102,6 @@ int tdbBtcDelete(SBTC *pBtc) {
tdbPageDropCell(pBtc->pPage, idx, pBtc->pTxn, pBtc->pBt);
// recycle ofps if any
if (pBtc->coder.ofps) {
for (int i = 0; i < TARRAY_SIZE(pBtc->coder.ofps); ++i) {
SPgno *pgno = taosArrayGet(pBtc->coder.ofps, i);
// tdbPagerInsertFreePage(pBtc->pBt->pPager, *pgno, pBtc->pTxn);
}
}
// update interior page or do balance
if (idx == nCells - 1) {
if (idx) {
......@@ -2384,10 +2397,6 @@ int tdbBtcClose(SBTC *pBtc) {
tdbTxnClose(pBtc->pTxn);
}
if (pBtc->coder.ofps) {
taosArrayDestroy(pBtc->coder.ofps);
}
return 0;
}
......
......@@ -292,7 +292,23 @@ int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
*/
return 0;
}
/*
int tdbPagerCancelDirty(SPager *pPager, SPage *pPage, TXN *pTxn) {
SRBTreeNode *pNode = tRBTreeGet(&pPager->rbt, (SRBTreeNode *)pPage);
if (pNode) {
pPage->isDirty = 0;
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
if (pTxn->jPageSet) {
hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
}
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
}
return 0;
}
*/
int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
SPage *pPage;
int ret;
......@@ -700,8 +716,9 @@ void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn) {
// TDB_PAGE_PGNO(pPage), pPage);
}
int tdbPagerInsertFreePage(SPager *pPager, SPgno pgno, TXN *pTxn) {
int code = 0;
int tdbPagerInsertFreePage(SPager *pPager, SPage *pPage, TXN *pTxn) {
int code = 0;
SPgno pgno = TDB_PAGE_PGNO(pPage);
// tdbError("tdb/insert-free-page: tbc get page: %d.", pgno);
code = tdbTbInsert(pPager->pEnv->pFreeDb, &pgno, sizeof(pgno), NULL, 0, pTxn);
......@@ -710,6 +727,8 @@ int tdbPagerInsertFreePage(SPager *pPager, SPgno pgno, TXN *pTxn) {
return -1;
}
pPage->pPager = NULL;
return code;
}
......
......@@ -199,7 +199,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn);
int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
TXN *pTxn);
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn);
int tdbPagerInsertFreePage(SPager *pPager, SPgno pgno, TXN *pTxn);
int tdbPagerInsertFreePage(SPager *pPager, SPage *pPage, TXN *pTxn);
// int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
int tdbPagerRestoreJournals(SPager *pPager);
int tdbPagerRollback(SPager *pPager);
......
......@@ -690,3 +690,100 @@ TEST(TdbPageRecycleTest, recycly_seq_insert_ofp_again) {
insertOfp();
system("ls -l ./tdb");
}
// TEST(TdbPageRecycleTest, DISABLED_recycly_seq_insert_ofp_nocommit) {
TEST(TdbPageRecycleTest, recycly_seq_insert_ofp_nocommit) {
clearDb("tdb");
insertOfp();
system("ls -l ./tdb");
// open Env
int ret = 0;
int const pageSize = 4096;
int const pageNum = 64;
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
GTEST_ASSERT_NE(pEnv, nullptr);
// open db
TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc = tKeyCmpr;
ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0);
// open the pool
SPoolMem *pPool = openPool();
// start a transaction
TXN *txn;
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
{ // delete the data
char const *key = "key123456789";
ret = tdbTbDelete(pDb, key, strlen(key) + 1, txn);
GTEST_ASSERT_EQ(ret, 0);
}
// 1, insert nData kv
{
int nData = nDataConst;
char key[64];
char val[(4083 - 4 - 3 - 2) + 1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int64_t poolLimit = 4096; // 1M pool limit
/*
SPoolMem *pPool;
// open the pool
pPool = openPool();
// start a transaction
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
*/
for (int iData = 0; iData < nData; ++iData) {
sprintf(key, "key%03d", iData);
sprintf(val, "value%03d", iData);
ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), txn);
GTEST_ASSERT_EQ(ret, 0);
// if pool is full, commit the transaction and start a new one
if (pPool->size >= poolLimit) {
// commit current transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
// start a new transaction
clearPool(pPool);
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
}
}
}
/*
// generate value payload
// char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
char val[32605];
int valLen = sizeof(val) / sizeof(val[0]);
generateBigVal(val, valLen);
// insert the generated big data
// char const *key = "key1";
char const *key = "key123456789";
ret = tdbTbInsert(pDb, key, strlen(key) + 1, val, valLen, txn);
GTEST_ASSERT_EQ(ret, 0);
*/
// commit current transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
closePool(pPool);
// Close a database
tdbTbClose(pDb);
// Close Env
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
system("ls -l ./tdb");
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册