提交 0facc8e0 编写于 作者: M Minglei Jin

fix(tdb/assert/cleanup): remove asserts

上级 f608481f
......@@ -74,7 +74,10 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg
SBTree *pBt;
int ret;
ASSERT(keyLen != 0);
if (keyLen == 0) {
tdbError("tdb/btree-open: key len cannot be zero.");
return -1;
}
*ppBt = NULL;
......@@ -152,7 +155,11 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg
tdbPostCommit(pPager->pEnv, txn);
}
ASSERT(pgno != 0);
if (pgno == 0) {
tdbError("tdb/btree-open: pgno cannot be zero.");
tdbOsFree(pBt);
return -1;
}
pBt->root = pgno;
/*
// TODO: pBt->root
......@@ -192,7 +199,7 @@ int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, in
ret = tdbBtcMoveTo(&btc, pKey, kLen, &c);
if (ret < 0) {
tdbBtcClose(&btc);
ASSERT(0);
tdbError("tdb/btree-insert: btc move to failed with ret: %d.", ret);
return -1;
}
......@@ -202,17 +209,17 @@ int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, in
if (c > 0) {
btc.idx++;
} else if (c == 0) {
// dup key not allowed
tdbError("unable to insert dup key. pKey: %p, kLen: %d, btc: %p, pTxn: %p", pKey, kLen, &btc, pTxn);
// ASSERT(0);
// dup key not allowed with insert
tdbBtcClose(&btc);
tdbError("tdb/btree-insert: dup key. pKey: %p, kLen: %d, btc: %p, pTxn: %p", pKey, kLen, &btc, pTxn);
return -1;
}
}
ret = tdbBtcUpsert(&btc, pKey, kLen, pVal, vLen, 1);
if (ret < 0) {
ASSERT(0);
tdbBtcClose(&btc);
tdbError("tdb/btree-insert: btc upsert failed with ret: %d.", ret);
return -1;
}
......@@ -233,7 +240,7 @@ int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn) {
ret = tdbBtcMoveTo(&btc, pKey, kLen, &c);
if (ret < 0) {
tdbBtcClose(&btc);
ASSERT(0);
tdbError("tdb/btree-delete: btc move to failed with ret: %d.", ret);
return -1;
}
......@@ -264,7 +271,7 @@ int tdbBtreeUpsert(SBTree *pBt, const void *pKey, int nKey, const void *pData, i
// move the cursor
ret = tdbBtcMoveTo(&btc, pKey, nKey, &c);
if (ret < 0) {
ASSERT(0);
tdbError("tdb/btree-upsert: btc move to failed with ret: %d.", ret);
tdbBtcClose(&btc);
return -1;
}
......@@ -280,8 +287,8 @@ int tdbBtreeUpsert(SBTree *pBt, const void *pKey, int nKey, const void *pData, i
ret = tdbBtcUpsert(&btc, pKey, nKey, pData, nData, c);
if (ret < 0) {
ASSERT(0);
tdbBtcClose(&btc);
tdbError("tdb/btree-upsert: btc upsert failed with ret: %d.", ret);
return -1;
}
......@@ -309,7 +316,8 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
ret = tdbBtcMoveTo(&btc, pKey, kLen, &cret);
if (ret < 0) {
tdbBtcClose(&btc);
ASSERT(0);
tdbError("tdb/btree-pget: btc move to failed with ret: %d.", ret);
return -1;
}
if (btc.idx < 0 || cret) {
......@@ -325,7 +333,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
pTKey = tdbRealloc(*ppKey, cd.kLen);
if (pTKey == NULL) {
tdbBtcClose(&btc);
ASSERT(0);
tdbError("tdb/btree-pget: realloc pTKey failed.");
return -1;
}
*ppKey = pTKey;
......@@ -337,7 +345,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
pTVal = tdbRealloc(*ppVal, cd.vLen);
if (pTVal == NULL) {
tdbBtcClose(&btc);
ASSERT(0);
tdbError("tdb/btree-pget: realloc pTVal failed.");
return -1;
}
*ppVal = pTVal;
......@@ -350,7 +358,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
}
if (TDB_CELLDECODER_FREE_VAL(&cd)) {
tdbDebug("tdb btc/pget/2 decoder: %p pVal free: %p", &cd, cd.pVal);
tdbTrace("tdb btc/pget/2 decoder: %p pVal free: %p", &cd, cd.pVal);
tdbFree(cd.pVal);
}
......@@ -381,36 +389,7 @@ static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2
}
return cret;
}
/*
static int tdbBtreeOpenImpl(SBTree *pBt) {
// Try to get the root page of the an existing btree
SPgno pgno;
SPage *pPage;
int ret;
{
// 1. TODO: Search the main DB to check if the DB exists
ret = tdbPagerOpenDB(pBt->pPager, &pgno, true, pBt);
ASSERT(ret == 0);
}
if (pgno != 0) {
pBt->root = pgno;
return 0;
}
// Try to create a new database
ret = tdbPagerAllocPage(pBt->pPager, &pgno);
if (ret < 0) {
ASSERT(0);
return -1;
}
ASSERT(pgno != 0);
pBt->root = pgno;
return 0;
}
*/
int tdbBtreeInitPage(SPage *pPage, void *arg, int init) {
SBTree *pBt;
u8 flags;
......@@ -560,7 +539,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
ret = tdbPagerFetchPage(pBt->pPager, &pgno, pOlds + i, tdbBtreeInitPage,
&((SBtreeInitPageArg){.pBt = pBt, .flags = 0}), pTxn);
if (ret < 0) {
ASSERT(0);
tdbError("tdb/btree-balance: fetch page failed with ret: %d.", ret);
return -1;
}
......@@ -722,7 +701,8 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
iarg.flags = flags;
ret = tdbPagerFetchPage(pBt->pPager, &pgno, pNews + iNew, tdbBtreeInitPage, &iarg, pTxn);
if (ret < 0) {
ASSERT(0);
tdbError("tdb/btree-balance: fetch page failed with ret: %d.", ret);
return -1;
}
ret = tdbPagerWrite(pBt->pPager, pNews[iNew]);
......@@ -1216,8 +1196,8 @@ static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const vo
ret = tdbBtreeEncodePayload(pPage, pCell, nHeader, pKey, kLen, pVal, vLen, &nPayload, pTxn, pBt);
if (ret < 0) {
// TODO
ASSERT(0);
return 0;
tdbError("tdb/btree-encode-cell: encode payload failed with ret: %d.", ret);
return -1;
}
*szCell = nHeader + nPayload;
......@@ -1577,7 +1557,7 @@ int tdbBtcMoveToFirst(SBTC *pBtc) {
ret = tdbPagerFetchPage(pPager, &pBt->root, &(pBtc->pPage), tdbBtreeInitPage,
&((SBtreeInitPageArg){.pBt = pBt, .flags = TDB_BTREE_ROOT | TDB_BTREE_LEAF}), pBtc->pTxn);
if (ret < 0) {
ASSERT(0);
tdbError("tdb/btc-move-tofirst: fetch page failed with ret: %d.", ret);
return -1;
}
......@@ -1621,7 +1601,7 @@ int tdbBtcMoveToFirst(SBTC *pBtc) {
ret = tdbBtcMoveDownward(pBtc);
if (ret < 0) {
ASSERT(0);
tdbError("tdb/btc-move-tofirst: btc move downward failed with ret: %d.", ret);
return -1;
}
......@@ -1646,7 +1626,7 @@ int tdbBtcMoveToLast(SBTC *pBtc) {
ret = tdbPagerFetchPage(pPager, &pBt->root, &(pBtc->pPage), tdbBtreeInitPage,
&((SBtreeInitPageArg){.pBt = pBt, .flags = TDB_BTREE_ROOT | TDB_BTREE_LEAF}), pBtc->pTxn);
if (ret < 0) {
ASSERT(0);
tdbError("tdb/btc-move-tolast: fetch page failed with ret: %d.", ret);
return -1;
}
......@@ -1694,7 +1674,7 @@ int tdbBtcMoveToLast(SBTC *pBtc) {
ret = tdbBtcMoveDownward(pBtc);
if (ret < 0) {
ASSERT(0);
tdbError("tdb/btc-move-tolast: btc move downward failed with ret: %d.", ret);
return -1;
}
......@@ -1752,7 +1732,7 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
ret = tdbBtcMoveToNext(pBtc);
if (ret < 0) {
ASSERT(0);
tdbError("tdb/btree-next: btc move to next failed with ret: %d.", ret);
return -1;
}
......@@ -1798,7 +1778,7 @@ int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
ret = tdbBtcMoveToPrev(pBtc);
if (ret < 0) {
ASSERT(0);
tdbError("tdb/btree-prev: btc move to prev failed with ret: %d.", ret);
return -1;
}
......@@ -1841,7 +1821,7 @@ int tdbBtcMoveToNext(SBTC *pBtc) {
ret = tdbBtcMoveDownward(pBtc);
if (ret < 0) {
ASSERT(0);
tdbError("tdb/btc-move-tonext: btc move downward failed with ret: %d.", ret);
return -1;
}
......@@ -1914,7 +1894,7 @@ static int tdbBtcMoveDownward(SBTC *pBtc) {
ret = tdbPagerFetchPage(pBtc->pBt->pPager, &pgno, &pBtc->pPage, tdbBtreeInitPage,
&((SBtreeInitPageArg){.pBt = pBtc->pBt, .flags = 0}), pBtc->pTxn);
if (ret < 0) {
ASSERT(0);
tdbError("tdb/btc-move-downward: fetch page failed with ret: %d.", ret);
return -1;
}
......@@ -1969,7 +1949,10 @@ int tdbBtcDelete(SBTC *pBtc) {
int nKey;
int ret;
ASSERT(idx >= 0 && idx < nCells);
if (idx < 0 || idx >= nCells) {
tdbError("tdb/btc-delete: idx: %d out of range[%d, %d).", idx, 0, nCells);
return -1;
}
// drop the cell on the leaf
ret = tdbPagerWrite(pPager, pBtc->pPage);
......@@ -2007,7 +1990,7 @@ int tdbBtcDelete(SBTC *pBtc) {
ret = tdbPageUpdateCell(pPage, idx, pCell, szCell, pBtc->pTxn, pBtc->pBt);
if (ret < 0) {
tdbOsFree(pCell);
ASSERT(0);
tdbError("tdb/btc-delete: page update cell failed with ret: %d.", ret);
return -1;
}
tdbOsFree(pCell);
......@@ -2022,7 +2005,7 @@ int tdbBtcDelete(SBTC *pBtc) {
ret = tdbBtreeBalance(pBtc);
if (ret < 0) {
ASSERT(0);
tdbError("tdb/btc-delete: btree balance failed with ret: %d.", ret);
return -1;
}
}
......@@ -2045,7 +2028,7 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int
szBuf = kLen + nData + 14;
pBuf = tdbRealloc(pBtc->pBt->pBuf, pBtc->pBt->pageSize > szBuf ? szBuf : pBtc->pBt->pageSize);
if (pBuf == NULL) {
ASSERT(0);
tdbError("tdb/btc-upsert: realloc pBuf failed.");
return -1;
}
pBtc->pBt->pBuf = pBuf;
......@@ -2054,7 +2037,7 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int
// encode cell
ret = tdbBtreeEncodeCell(pBtc->pPage, pKey, kLen, pData, nData, pCell, &szCell, pBtc->pTxn, pBtc->pBt);
if (ret < 0) {
ASSERT(0);
tdbError("tdb/btc-upsert: btree encode cell failed with ret: %d.", ret);
return -1;
}
......@@ -2076,7 +2059,7 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int
ret = tdbPageUpdateCell(pBtc->pPage, pBtc->idx, pCell, szCell, pBtc->pTxn, pBtc->pBt);
}
if (ret < 0) {
ASSERT(0);
tdbError("tdb/btc-upsert: page insert/update cell failed with ret: %d.", ret);
return -1;
}
......@@ -2084,7 +2067,7 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int
if (pBtc->pPage->nOverflow > 0) {
ret = tdbBtreeBalance(pBtc);
if (ret < 0) {
ASSERT(0);
tdbError("tdb/btc-upsert: btree balance failed with ret: %d.", ret);
return -1;
}
}
......@@ -2233,7 +2216,10 @@ int tdbBtcClose(SBTC *pBtc) {
if (pBtc->iPage < 0) return 0;
for (;;) {
ASSERT(pBtc->pPage);
if (NULL == pBtc->pPage) {
tdbError("tdb/btc-close: null ptr pPage.");
return -1;
}
tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage, pBtc->pTxn);
......
......@@ -247,7 +247,10 @@ void tdbEnvRemovePager(TDB *pDb, SPager *pPager) {
// remove from the list
for (ppPager = &pDb->pgrList; *ppPager && (*ppPager != pPager); ppPager = &((*ppPager)->pNext)) {
}
ASSERT(*ppPager == pPager);
if (*ppPager != pPager) {
tdbError("tdb/db: invalid pPager: %p, *ppPager: %p", pPager, *ppPager);
return;
}
*ppPager = pPager->pNext;
// remove from hash
......@@ -255,7 +258,10 @@ void tdbEnvRemovePager(TDB *pDb, SPager *pPager) {
ppPager = &pDb->pgrHash[hash % pDb->nPgrHash];
for (; *ppPager && *ppPager != pPager; ppPager = &((*ppPager)->pHashNext)) {
}
ASSERT(*ppPager == pPager);
if (*ppPager != pPager) {
tdbError("tdb/db: invalid pPager: %p, *ppPager: %p", pPager, *ppPager);
return;
}
*ppPager = pPager->pNext;
// decrease the counter
......
......@@ -236,10 +236,10 @@ void tdbPCacheInvalidatePage(SPCache *pCache, SPager *pPager, SPgno pgno) {
void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) {
i32 nRef;
ASSERT(pTxn);
// nRef = tdbUnrefPage(pPage);
// ASSERT(nRef >= 0);
if (!pTxn) {
tdbError("tdb/pcache: null ptr pTxn, release failed.");
return;
}
tdbPCacheLock(pCache);
nRef = tdbUnrefPage(pPage);
......@@ -275,7 +275,10 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
SPage *pPage = NULL;
SPage *pPageH = NULL;
ASSERT(pTxn);
if (!pTxn) {
tdbError("tdb/pcache: null ptr pTxn, fetch impl failed.");
return NULL;
}
// 1. Search the hash table
pPage = pCache->pgHash[tdbPCachePageHash(pPgid) % pCache->nHash];
......@@ -315,8 +318,8 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
if (!pPage && pTxn->xMalloc != NULL) {
ret = tdbPageCreate(pCache->szPage, &pPage, pTxn->xMalloc, pTxn->xArg);
if (ret < 0 || pPage == NULL) {
// TODO
ASSERT(0);
tdbError("tdb/pcache: ret: %" PRId32 " pPage: %p, page create failed.", ret, pPage);
// TODO: recycle other backup pages
return NULL;
}
......@@ -370,7 +373,11 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
static void tdbPCachePinPage(SPCache *pCache, SPage *pPage) {
if (pPage->pLruNext != NULL) {
ASSERT(tdbGetPageRef(pPage) == 0);
int32_t nRef = tdbGetPageRef(pPage);
if (nRef != 0) {
tdbError("tdb/pcache: pin page's ref not zero: %" PRId32, nRef);
return;
}
pPage->pLruPrev->pLruNext = pPage->pLruNext;
pPage->pLruNext->pLruPrev = pPage->pLruPrev;
......@@ -383,13 +390,23 @@ static void tdbPCachePinPage(SPCache *pCache, SPage *pPage) {
}
static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage) {
i32 nRef;
ASSERT(pPage->isLocal);
ASSERT(!pPage->isDirty);
ASSERT(tdbGetPageRef(pPage) == 0);
ASSERT(pPage->pLruNext == NULL);
i32 nRef = tdbGetPageRef(pPage);
if (nRef != 0) {
tdbError("tdb/pcache: unpin page's ref not zero: %" PRId32, nRef);
return;
}
if (!pPage->isLocal) {
tdbError("tdb/pcache: unpin page's not local: %" PRIu8, pPage->isLocal);
return;
}
if (pPage->isDirty) {
tdbError("tdb/pcache: unpin page's dirty: %" PRIu8, pPage->isDirty);
return;
}
if (NULL != pPage->pLruNext) {
tdbError("tdb/pcache: unpin page's pLruNext not null.");
return;
}
tdbTrace("pCache:%p unpin page %p/%d, nPages:%d, pgno:%d, ", pCache, pPage, pPage->id, pCache->nPages,
TDB_PAGE_PGNO(pPage));
......
......@@ -43,9 +43,15 @@ int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t)
u8 *ptr;
int size;
ASSERT(xMalloc);
if (!xMalloc) {
tdbError("tdb/page-create: null xMalloc.");
return -1;
}
ASSERT(TDB_IS_PGSIZE_VLD(pageSize));
if (!TDB_IS_PGSIZE_VLD(pageSize)) {
tdbError("tdb/page-create: invalid pageSize: %d.", pageSize);
return -1;
}
*ppPage = NULL;
size = pageSize + sizeof(*pPage);
......@@ -69,16 +75,24 @@ int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t)
*ppPage = pPage;
tdbTrace("page/create: %p/%d %p", pPage, pPage->id, xMalloc);
tdbTrace("tdb/page-create: %p/%d %p", pPage, pPage->id, xMalloc);
return 0;
}
int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg) {
u8 *ptr;
tdbTrace("page/destroy: %p/%d %p", pPage, pPage->id, xFree);
ASSERT(!pPage->isDirty);
ASSERT(xFree);
tdbTrace("tdb/page-destroy: %p/%d %p", pPage, pPage->id, xFree);
if (pPage->isDirty) {
tdbError("tdb/page-destroy: dirty page: %" PRIu8 ".", pPage->isDirty);
return -1;
}
if (!xFree) {
tdbError("tdb/page-destroy: null xFree.");
return -1;
}
for (int iOvfl = 0; iOvfl < pPage->nOverflow; iOvfl++) {
tdbTrace("tdbPage/destroy/free ovfl cell: %p/%p", pPage->apOvfl[iOvfl], pPage);
......@@ -105,7 +119,10 @@ void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell
pPage->nOverflow = 0;
pPage->xCellSize = xCellSize;
ASSERT((u8 *)pPage->pPageFtr == pPage->pFreeEnd);
if ((u8 *)pPage->pPageFtr != pPage->pFreeEnd) {
tdbError("tdb/page-zero: invalid page, pFreeEnd: %p, pPageFtr: %p", pPage->pFreeEnd, pPage->pPageFtr);
return;
}
}
void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int, TXN *, SBTree *pBt)) {
......@@ -121,8 +138,15 @@ void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell
pPage->nOverflow = 0;
pPage->xCellSize = xCellSize;
ASSERT(pPage->pFreeEnd >= pPage->pFreeStart);
ASSERT(pPage->pFreeEnd - pPage->pFreeStart <= TDB_PAGE_NFREE(pPage));
if (pPage->pFreeEnd < pPage->pFreeStart) {
tdbError("tdb/page-init: invalid page, pFreeEnd: %p, pFreeStart: %p", pPage->pFreeEnd, pPage->pFreeStart);
return;
}
if (pPage->pFreeEnd - pPage->pFreeStart > TDB_PAGE_NFREE(pPage)) {
tdbError("tdb/page-init: invalid page, pFreeEnd: %p, pFreeStart: %p, NFREE: %d", pPage->pFreeEnd, pPage->pFreeStart,
TDB_PAGE_NFREE(pPage));
return;
}
}
int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl) {
......@@ -132,7 +156,11 @@ int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl
int lidx; // local idx
SCell *pNewCell;
ASSERT(szCell <= TDB_PAGE_MAX_FREE_BLOCK(pPage, pPage->pPageHdr - pPage->pData));
if (szCell > TDB_PAGE_MAX_FREE_BLOCK(pPage, pPage->pPageHdr - pPage->pData)) {
tdbError("tdb/page-insert-cell: invalid page, szCell: %d, max free: %lu", szCell,
TDB_PAGE_MAX_FREE_BLOCK(pPage, pPage->pPageHdr - pPage->pData));
return -1;
}
nFree = TDB_PAGE_NFREE(pPage);
nCells = TDB_PAGE_NCELLS(pPage);
......@@ -176,7 +204,11 @@ int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl
TDB_PAGE_CELL_OFFSET_AT_SET(pPage, lidx, pNewCell - pPage->pData);
TDB_PAGE_NCELLS_SET(pPage, nCells + 1);
ASSERT(pPage->pFreeStart == pPage->pCellIdx + TDB_PAGE_OFFSET_SIZE(pPage) * (nCells + 1));
if (pPage->pFreeStart != pPage->pCellIdx + TDB_PAGE_OFFSET_SIZE(pPage) * (nCells + 1)) {
tdbError("tdb/page-insert-cell: invalid page, pFreeStart: %p, pCellIdx: %p, nCells: %d", pPage->pFreeStart,
pPage->pCellIdx, nCells);
return -1;
}
}
for (; iOvfl < pPage->nOverflow; iOvfl++) {
......@@ -200,7 +232,10 @@ int tdbPageDropCell(SPage *pPage, int idx, TXN *pTxn, SBTree *pBt) {
nCells = TDB_PAGE_NCELLS(pPage);
ASSERT(idx >= 0 && idx < nCells + pPage->nOverflow);
if (idx < 0 || idx >= nCells + pPage->nOverflow) {
tdbError("tdb/page-drop-cell: idx: %d out of range, nCells: %d, nOvfl: %d.", idx, nCells, pPage->nOverflow);
return -1;
}
iOvfl = 0;
for (; iOvfl < pPage->nOverflow; iOvfl++) {
......@@ -228,7 +263,10 @@ int tdbPageDropCell(SPage *pPage, int idx, TXN *pTxn, SBTree *pBt) {
for (; iOvfl < pPage->nOverflow; iOvfl++) {
pPage->aiOvfl[iOvfl]--;
ASSERT(pPage->aiOvfl[iOvfl] > 0);
if (pPage->aiOvfl[iOvfl] <= 0) {
tdbError("tdb/page-drop-cell: invalid ai idx: %d", pPage->aiOvfl[iOvfl]);
return -1;
}
}
return 0;
......@@ -240,12 +278,19 @@ void tdbPageCopy(SPage *pFromPage, SPage *pToPage, int deepCopyOvfl) {
pToPage->pFreeStart = pToPage->pPageHdr + (pFromPage->pFreeStart - pFromPage->pPageHdr);
pToPage->pFreeEnd = (u8 *)(pToPage->pPageFtr) - ((u8 *)pFromPage->pPageFtr - pFromPage->pFreeEnd);
ASSERT(pToPage->pFreeEnd >= pToPage->pFreeStart);
if (pToPage->pFreeEnd < pToPage->pFreeStart) {
tdbError("tdb/page-copy: invalid to page, pFreeStart: %p, pFreeEnd: %p", pToPage->pFreeStart, pToPage->pFreeEnd);
return;
}
memcpy(pToPage->pPageHdr, pFromPage->pPageHdr, pFromPage->pFreeStart - pFromPage->pPageHdr);
memcpy(pToPage->pFreeEnd, pFromPage->pFreeEnd, (u8 *)pFromPage->pPageFtr - pFromPage->pFreeEnd);
ASSERT(TDB_PAGE_CCELLS(pToPage) == pToPage->pFreeEnd - pToPage->pData);
if (TDB_PAGE_CCELLS(pToPage) != pToPage->pFreeEnd - pToPage->pData) {
tdbError("tdb/page-copy: invalid to page, cell body: %d, range: %ld", TDB_PAGE_CCELLS(pToPage),
pToPage->pFreeEnd - pToPage->pData);
return;
}
delta = (pToPage->pPageHdr - pToPage->pData) - (pFromPage->pPageHdr - pFromPage->pData);
if (delta != 0) {
......@@ -295,8 +340,16 @@ static int tdbPageAllocate(SPage *pPage, int szCell, SCell **ppCell) {
*ppCell = NULL;
nFree = TDB_PAGE_NFREE(pPage);
ASSERT(nFree >= szCell + TDB_PAGE_OFFSET_SIZE(pPage));
ASSERT(TDB_PAGE_CCELLS(pPage) == pPage->pFreeEnd - pPage->pData);
if (nFree < szCell + TDB_PAGE_OFFSET_SIZE(pPage)) {
tdbError("tdb/page-allocate: invalid cell size, nFree: %d, szCell: %d, szOffset: %d", nFree, szCell,
TDB_PAGE_OFFSET_SIZE(pPage));
return -1;
}
if (TDB_PAGE_CCELLS(pPage) != pPage->pFreeEnd - pPage->pData) {
tdbError("tdb/page-allocate: invalid page, cell body: %d, range: %ld", TDB_PAGE_CCELLS(pPage),
pPage->pFreeEnd - pPage->pData);
return -1;
}
// 1. Try to allocate from the free space block area
if (pPage->pFreeEnd - pPage->pFreeStart >= szCell + TDB_PAGE_OFFSET_SIZE(pPage)) {
......@@ -308,7 +361,10 @@ static int tdbPageAllocate(SPage *pPage, int szCell, SCell **ppCell) {
// 2. Try to allocate from the page free list
cellFree = TDB_PAGE_FCELL(pPage);
ASSERT(cellFree == 0 || cellFree >= pPage->pFreeEnd - pPage->pData);
if (cellFree != 0 && cellFree < pPage->pFreeEnd - pPage->pData) {
tdbError("tdb/page-allocate: cellFree: %d, pFreeEnd: %p, pData: %p.", cellFree, pPage->pFreeEnd, pPage->pData);
return -1;
}
if (cellFree && pPage->pFreeEnd - pPage->pFreeStart >= TDB_PAGE_OFFSET_SIZE(pPage)) {
SCell *pPrevFreeCell = NULL;
int szPrevFreeCell;
......@@ -353,16 +409,30 @@ static int tdbPageAllocate(SPage *pPage, int szCell, SCell **ppCell) {
// 3. Try to dfragment and allocate again
tdbPageDefragment(pPage);
ASSERT(pPage->pFreeEnd - pPage->pFreeStart == nFree);
ASSERT(nFree == TDB_PAGE_NFREE(pPage));
ASSERT(pPage->pFreeEnd - pPage->pData == TDB_PAGE_CCELLS(pPage));
if (pPage->pFreeEnd - pPage->pFreeStart != nFree) {
tdbError("tdb/page-allocate: nFree: %d, pFreeStart: %p, pFreeEnd: %p.", nFree, pPage->pFreeStart, pPage->pFreeEnd);
return -1;
}
if (TDB_PAGE_NFREE(pPage) != nFree) {
tdbError("tdb/page-allocate: nFree: %d, page free: %d.", nFree, TDB_PAGE_NFREE(pPage));
return -1;
}
if (pPage->pFreeEnd - pPage->pData != TDB_PAGE_CCELLS(pPage)) {
tdbError("tdb/page-allocate: ccells: %d, pFreeStart: %p, pData: %p.", TDB_PAGE_CCELLS(pPage), pPage->pFreeStart,
pPage->pData);
return -1;
}
pPage->pFreeEnd -= szCell;
pCell = pPage->pFreeEnd;
TDB_PAGE_CCELLS_SET(pPage, pPage->pFreeEnd - pPage->pData);
_alloc_finish:
ASSERT(pCell);
if (NULL == pCell) {
tdbError("tdb/page-allocate: null ptr pCell.");
return -1;
}
pPage->pFreeStart += TDB_PAGE_OFFSET_SIZE(pPage);
TDB_PAGE_NFREE_SET(pPage, nFree - szCell - TDB_PAGE_OFFSET_SIZE(pPage));
*ppCell = pCell;
......@@ -375,9 +445,18 @@ static int tdbPageFree(SPage *pPage, int idx, SCell *pCell, int szCell) {
u8 *dest;
u8 *src;
ASSERT(pCell >= pPage->pFreeEnd);
ASSERT(pCell + szCell <= (u8 *)(pPage->pPageFtr));
ASSERT(pCell == TDB_PAGE_CELL_AT(pPage, idx));
if (pCell < pPage->pFreeEnd) {
tdbError("tdb/page-free: invalid cell, cell: %p, free end: %p", pCell, pPage->pFreeEnd);
return -1;
}
if (pCell + szCell > (u8 *)(pPage->pPageFtr)) {
tdbError("tdb/page-free: cell crosses page footer, cell: %p, size: %d footer: %p", pCell, szCell, pPage->pFreeEnd);
return -1;
}
if (pCell != TDB_PAGE_CELL_AT(pPage, idx)) {
tdbError("tdb/page-free: cell pos incorrect, cell: %p, pos: %p", pCell, TDB_PAGE_CELL_AT(pPage, idx));
return -1;
}
nFree = TDB_PAGE_NFREE(pPage);
......@@ -390,7 +469,8 @@ static int tdbPageFree(SPage *pPage, int idx, SCell *pCell, int szCell) {
pPage->pPageMethods->setFreeCellInfo(pCell, szCell, cellFree);
TDB_PAGE_FCELL_SET(pPage, pCell - pPage->pData);
} else {
ASSERT(0);
tdbError("tdb/page-free: invalid cell size: %d", szCell);
return -1;
}
}
......@@ -417,7 +497,10 @@ static int tdbPageDefragment(SPage *pPage) {
nFree = TDB_PAGE_NFREE(pPage);
nCells = TDB_PAGE_NCELLS(pPage);
ASSERT(pPage->pFreeEnd - pPage->pFreeStart < nFree);
if (pPage->pFreeEnd - pPage->pFreeStart >= nFree) {
tdbError("tdb/page-defragment: invalid free range, nFree: %d.", nFree);
return -1;
}
// Loop to compact the page content
// Here we use an O(n^2) algorithm to do the job since
......@@ -443,11 +526,19 @@ static int tdbPageDefragment(SPage *pPage) {
}
}
ASSERT(pCell != NULL);
if (NULL == pCell) {
tdbError("tdb/page-defragment: null ptr pCell.");
return -1;
}
szCell = (*pPage->xCellSize)(pPage, pCell, 0, NULL, NULL);
ASSERT(pCell + szCell <= pNextCell);
if (pCell + szCell > pNextCell) {
tdbError("tdb/page-defragment: invalid cell range, pCell: %p, szCell: %d, pNextCell: %p.", pCell, szCell,
pNextCell);
return -1;
}
if (pCell + szCell < pNextCell) {
memmove(pNextCell - szCell, pCell, szCell);
}
......@@ -457,7 +548,11 @@ static int tdbPageDefragment(SPage *pPage) {
TDB_PAGE_CELL_OFFSET_AT_SET(pPage, idx, pNextCell - pPage->pData);
}
ASSERT(pPage->pFreeEnd - pPage->pFreeStart == nFree);
if (pPage->pFreeEnd - pPage->pFreeStart != nFree) {
tdbError("tdb/page-defragment: invalid free range, nFree: %d.", nFree);
return -1;
}
TDB_PAGE_CCELLS_SET(pPage, pPage->pFreeEnd - pPage->pData);
TDB_PAGE_FCELL_SET(pPage, 0);
......@@ -483,39 +578,59 @@ typedef struct {
// cellNum
static inline int getPageCellNum(SPage *pPage) { return ((SPageHdr *)(pPage->pPageHdr))[0].cellNum; }
static inline void setPageCellNum(SPage *pPage, int cellNum) {
ASSERT(cellNum < 65536);
if (cellNum >= 65536) {
tdbError("tdb/page-set-cell-num: invalid cellNum: %d.", cellNum);
return;
}
((SPageHdr *)(pPage->pPageHdr))[0].cellNum = (u16)cellNum;
}
// cellBody
static inline int getPageCellBody(SPage *pPage) { return ((SPageHdr *)(pPage->pPageHdr))[0].cellBody; }
static inline void setPageCellBody(SPage *pPage, int cellBody) {
ASSERT(cellBody < 65536);
if (cellBody >= 65536) {
tdbError("tdb/page-set-cell-body: invalid cellBody: %d.", cellBody);
return;
}
((SPageHdr *)(pPage->pPageHdr))[0].cellBody = (u16)cellBody;
}
// cellFree
static inline int getPageCellFree(SPage *pPage) { return ((SPageHdr *)(pPage->pPageHdr))[0].cellFree; }
static inline void setPageCellFree(SPage *pPage, int cellFree) {
ASSERT(cellFree < 65536);
if (cellFree >= 65536) {
tdbError("tdb/page-set-cell-free: invalid cellFree: %d.", cellFree);
return;
}
((SPageHdr *)(pPage->pPageHdr))[0].cellFree = (u16)cellFree;
}
// nFree
static inline int getPageNFree(SPage *pPage) { return ((SPageHdr *)(pPage->pPageHdr))[0].nFree; }
static inline void setPageNFree(SPage *pPage, int nFree) {
ASSERT(nFree < 65536);
if (nFree >= 65536) {
tdbError("tdb/page-set-nfree: invalid nFree: %d.", nFree);
return;
}
((SPageHdr *)(pPage->pPageHdr))[0].nFree = (u16)nFree;
}
// cell offset
static inline int getPageCellOffset(SPage *pPage, int idx) {
ASSERT(idx >= 0 && idx < getPageCellNum(pPage));
int cellNum = getPageCellNum(pPage);
if (idx < 0 || idx >= cellNum) {
tdbError("tdb/page-cell-offset: idx: %d out of range[%d, %d).", idx, 0, cellNum);
return -1;
}
return ((u16 *)pPage->pCellIdx)[idx];
}
static inline void setPageCellOffset(SPage *pPage, int idx, int offset) {
ASSERT(offset < 65536);
if (offset >= 65536) {
tdbError("tdb/page-set-cell-offset: invalid offset: %d.", offset);
return;
}
((u16 *)pPage->pCellIdx)[idx] = (u16)offset;
}
......@@ -590,7 +705,12 @@ static inline void setLPageNFree(SPage *pPage, int nFree) {
// cell offset
static inline int getLPageCellOffset(SPage *pPage, int idx) {
ASSERT(idx >= 0 && idx < getLPageCellNum(pPage));
int cellNum = getLPageCellNum(pPage);
if (idx < 0 || idx >= cellNum) {
tdbError("tdb/lpage-cell-offset: idx: %d out of range[%d, %d).", idx, 0, cellNum);
return -1;
}
return TDB_GET_U24(pPage->pCellIdx + 3 * idx);
}
......
......@@ -14,7 +14,7 @@
*/
#include "tdbInt.h"
/*
#pragma pack(push, 1)
typedef struct {
u8 hdrString[16];
......@@ -26,7 +26,7 @@ typedef struct {
#pragma pack(pop)
TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct");
*/
struct hashset_st {
size_t nbits;
size_t mask;
......@@ -234,7 +234,6 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) {
int ret;
SPage **ppPage;
// ASSERT(pPager->inTran);
if (pPage->isDirty) return 0;
// ref page one more time so the page will not be release
......@@ -243,23 +242,8 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) {
// Set page as dirty
pPage->isDirty = 1;
/*
// Add page to dirty list(TODO: NOT use O(n^2) algorithm)
for (ppPage = &pPager->pDirty; (*ppPage) && TDB_PAGE_PGNO(*ppPage) < TDB_PAGE_PGNO(pPage);
ppPage = &((*ppPage)->pDirtyNext)) {
}
if (*ppPage && TDB_PAGE_PGNO(*ppPage) == TDB_PAGE_PGNO(pPage)) {
tdbUnrefPage(pPage);
return 0;
}
ASSERT(*ppPage == NULL || TDB_PAGE_PGNO(*ppPage) > TDB_PAGE_PGNO(pPage));
pPage->pDirtyNext = *ppPage;
*ppPage = pPage;
*/
tdbTrace("put page: %p %d to dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
tdbTrace("tdb/pager-write: put page: %p %d to dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
tRBTreePut(&pPager->rbt, (SRBTreeNode *)pPage);
// Write page to journal if neccessary
......@@ -327,7 +311,11 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
pPage = (SPage *)pNode;
ASSERT(pPage->nOverflow == 0);
if (pPage->nOverflow != 0) {
tdbError("tdb/pager-commit: %p, pPage: %p, ovfl: %d, commit page failed.", pPager, pPage, pPage->nOverflow);
return -1;
}
ret = tdbPagerPWritePageToDB(pPager, pPage);
if (ret < 0) {
tdbError("failed to write page to db since %s", tstrerror(terrno));
......@@ -652,12 +640,15 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
loadPage = 0;
ret = tdbPagerAllocPage(pPager, &pgno);
if (ret < 0) {
ASSERT(0);
tdbError("tdb/pager: %p, ret: %d pgno: %" PRIu32 ", alloc page failed.", pPager, ret, pgno);
return -1;
}
}
ASSERT(pgno > 0);
if (pgno == 0) {
tdbError("tdb/pager: %p, ret: %d pgno: %" PRIu32 ", alloc page failed.", pPager, ret, pgno);
return -1;
}
// fetch a page container
memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
......@@ -671,7 +662,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
if (!TDB_PAGE_INITIALIZED(pPage)) {
ret = tdbPagerInitPage(pPager, pPage, initPage, arg, loadPage);
if (ret < 0) {
ASSERT(0);
tdbError("tdb/pager: %p, pPage: %p, init page failed.", pPager, pPage);
return -1;
}
}
......@@ -679,8 +670,14 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
// printf("thread %" PRId64 " pager fetch page %d pgno %d ppage %p\n", taosGetSelfPthreadId(), pPage->id,
// TDB_PAGE_PGNO(pPage), pPage);
ASSERT(TDB_PAGE_INITIALIZED(pPage));
ASSERT(pPage->pPager == pPager);
if (!TDB_PAGE_INITIALIZED(pPage)) {
tdbError("tdb/pager: %p, pPage: %p, fetch page uninited.", pPager, pPage);
return -1;
}
if (pPage->pPager != pPager) {
tdbError("tdb/pager: %p/%p, fetch page failed.", pPager, pPage->pPager);
return -1;
}
*ppgno = pgno;
*ppPage = pPage;
......@@ -722,8 +719,10 @@ int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno) {
return -1;
}
ASSERT(*ppgno != 0);
if (*ppgno == 0) {
tdbError("tdb/pager:%p, alloc new page failed.", pPager);
return -1;
}
return 0;
}
......@@ -752,7 +751,6 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
nRead = tdbOsPRead(pPager->fd, pPage->pData, pPage->pageSize, ((i64)pPage->pageSize) * (pgno - 1));
tdbTrace("tdb/pager:%p, pgno:%d, nRead:%" PRId64, pPager, pgno, nRead);
if (nRead < pPage->pageSize) {
ASSERT(0);
tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32, pPager, pgno, nRead, pPage->pageSize);
TDB_UNLOCK_PAGE(pPage);
return -1;
......@@ -763,7 +761,8 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
ret = (*initPage)(pPage, arg, init);
if (ret < 0) {
ASSERT(0);
tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32 " init page failed.", pPager, pgno, nRead,
pPage->pageSize);
TDB_UNLOCK_PAGE(pPage);
return -1;
}
......@@ -782,7 +781,8 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
}
}
} else {
ASSERT(0);
tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32 " lock page failed.", pPager, pgno, nRead,
pPage->pageSize);
return -1;
}
......
......@@ -105,8 +105,6 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
#endif
ASSERT(pPager != NULL);
if (rollback) {
tdbPagerRollback(pPager);
} else {
......
......@@ -18,7 +18,10 @@
int tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *), void *xArg,
int flags) {
// not support read-committed version at the moment
ASSERT(flags == 0 || flags == (TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
if (flags != 0 && flags != (TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)) {
tdbError("tdb/txn: invalid txn flags: %" PRId32, flags);
return -1;
}
pTxn->flags = flags;
pTxn->txnId = txnid;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册