提交 2872a50e 编写于 作者: M Minglei Jin

tdb/ofp: support overflow pages for big data

上级 c1de4df9
此差异已折叠。
......@@ -82,7 +82,8 @@ int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg)
return 0;
}
void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *)) {
void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int,
TXN *, SBTree *pBt)) {
pPage->pPageHdr = pPage->pData + szAmHdr;
TDB_PAGE_NCELLS_SET(pPage, 0);
TDB_PAGE_CCELLS_SET(pPage, pPage->pageSize - sizeof(SPageFtr));
......@@ -98,7 +99,8 @@ void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell
ASSERT((u8 *)pPage->pPageFtr == pPage->pFreeEnd);
}
void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *)) {
void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int,
TXN *, SBTree *pBt)) {
pPage->pPageHdr = pPage->pData + szAmHdr;
pPage->pCellIdx = pPage->pPageHdr + TDB_PAGE_HDR_SIZE(pPage);
pPage->pFreeStart = pPage->pCellIdx + TDB_PAGE_OFFSET_SIZE(pPage) * TDB_PAGE_NCELLS(pPage);
......@@ -171,12 +173,12 @@ int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl
return 0;
}
int tdbPageUpdateCell(SPage *pPage, int idx, SCell *pCell, int szCell) {
tdbPageDropCell(pPage, idx);
int tdbPageUpdateCell(SPage *pPage, int idx, SCell *pCell, int szCell, TXN *pTxn, SBTree *pBt) {
tdbPageDropCell(pPage, idx, pTxn, pBt);
return tdbPageInsertCell(pPage, idx, pCell, szCell, 0);
}
int tdbPageDropCell(SPage *pPage, int idx) {
int tdbPageDropCell(SPage *pPage, int idx, TXN *pTxn, SBTree *pBt) {
int lidx;
SCell *pCell;
int szCell;
......@@ -205,7 +207,7 @@ int tdbPageDropCell(SPage *pPage, int idx) {
lidx = idx - iOvfl;
pCell = TDB_PAGE_CELL_AT(pPage, lidx);
szCell = (*pPage->xCellSize)(pPage, pCell);
szCell = (*pPage->xCellSize)(pPage, pCell, 1, pTxn, pBt);
tdbPageFree(pPage, lidx, pCell, szCell);
TDB_PAGE_NCELLS_SET(pPage, nCells - 1);
......@@ -420,7 +422,7 @@ static int tdbPageDefragment(SPage *pPage) {
ASSERT(pCell != NULL);
szCell = (*pPage->xCellSize)(pPage, pCell);
szCell = (*pPage->xCellSize)(pPage, pCell, 0, NULL, NULL);
ASSERT(pCell + szCell <= pNextCell);
if (pCell + szCell < pNextCell) {
......
......@@ -116,13 +116,25 @@ typedef struct SBtInfo {
int nData;
} SBtInfo;
#define TDB_CELLD_F_NIL 0x0
#define TDB_CELLD_F_KEY 0x1
#define TDB_CELLD_F_VAL 0x2
#define TDB_CELLDECODER_SET_FREE_NIL(pCellDecoder) ((pCellDecoder)->freeKV = TDB_CELLD_F_NIL)
#define TDB_CELLDECODER_SET_FREE_KEY(pCellDecoder) ((pCellDecoder)->freeKV |= TDB_CELLD_F_KEY)
#define TDB_CELLDECODER_SET_FREE_VAL(pCellDecoder) ((pCellDecoder)->freeKV |= TDB_CELLD_F_VAL)
#define TDB_CELLDECODER_FREE_KEY(pCellDecoder) ((pCellDecoder)->freeKV & TDB_CELLD_F_KEY)
#define TDB_CELLDECODER_FREE_VAL(pCellDecoder) ((pCellDecoder)->freeKV & TDB_CELLD_F_VAL)
typedef struct {
int kLen;
const u8 *pKey;
u8 *pKey;
int vLen;
const u8 *pVal;
u8 *pVal;
SPgno pgno;
u8 *pBuf;
u8 freeKV;
} SCellDecoder;
struct SBTC {
......@@ -250,7 +262,7 @@ struct SPage {
int vLen; // value length of the page, -1 for unknown
int maxLocal;
int minLocal;
int (*xCellSize)(const SPage *, SCell *);
int (*xCellSize)(const SPage *, SCell *, int, TXN *pTxn, SBTree *pBt);
// Fields used by SPCache
TDB_PCACHE_PAGE
};
......@@ -297,16 +309,18 @@ static inline int tdbTryLockPage(tdb_spinlock_t *pLock) {
#define TDB_PAGE_USABLE_SIZE(pPage) ((u8 *)(pPage)->pPageFtr - (pPage)->pCellIdx)
#define TDB_PAGE_FREE_SIZE(pPage) (*(pPage)->pPageMethods->getFreeBytes)(pPage)
#define TDB_PAGE_PGNO(pPage) ((pPage)->pgid.pgno)
#define TDB_BYTES_CELL_TAKEN(pPage, pCell) ((*(pPage)->xCellSize)(pPage, pCell) + (pPage)->pPageMethods->szOffset)
#define TDB_BYTES_CELL_TAKEN(pPage, pCell) ((*(pPage)->xCellSize)(pPage, pCell, 0, NULL, NULL) + (pPage)->pPageMethods->szOffset)
#define TDB_PAGE_OFFSET_SIZE(pPage) ((pPage)->pPageMethods->szOffset)
int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t), void *arg);
int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg);
void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *));
void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *));
void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int,
TXN *, SBTree *pBt));
void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int,
TXN *, SBTree *pBt));
int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl);
int tdbPageDropCell(SPage *pPage, int idx);
int tdbPageUpdateCell(SPage *pPage, int idx, SCell *pCell, int szCell);
int tdbPageDropCell(SPage *pPage, int idx, TXN *pTxn, SBTree *pBt);
int tdbPageUpdateCell(SPage *pPage, int idx, SCell *pCell, int szCell, TXN *pTxn, SBTree *pBt);
void tdbPageCopy(SPage *pFromPage, SPage *pToPage);
int tdbPageCapacity(int pageSize, int amHdrSize);
......
......@@ -4,4 +4,9 @@ target_link_libraries(tdbTest tdb gtest gtest_main)
# tdbUtilTest
add_executable(tdbUtilTest "tdbUtilTest.cpp")
target_link_libraries(tdbUtilTest tdb gtest gtest_main)
\ No newline at end of file
target_link_libraries(tdbUtilTest tdb gtest gtest_main)
# tdbUtilTest
add_executable(tdbExOVFLTest "tdbExOVFLTest.cpp")
target_link_libraries(tdbExOVFLTest tdb gtest gtest_main)
#include <gtest/gtest.h>
#define ALLOW_FORBID_FUNC
#include "os.h"
#include "tdb.h"
#include <shared_mutex>
#include <string>
#include <thread>
#include <vector>
typedef struct SPoolMem {
int64_t size;
struct SPoolMem *prev;
struct SPoolMem *next;
} SPoolMem;
static SPoolMem *openPool() {
SPoolMem *pPool = (SPoolMem *)taosMemoryMalloc(sizeof(*pPool));
pPool->prev = pPool->next = pPool;
pPool->size = 0;
return pPool;
}
static void clearPool(SPoolMem *pPool) {
SPoolMem *pMem;
do {
pMem = pPool->next;
if (pMem == pPool) break;
pMem->next->prev = pMem->prev;
pMem->prev->next = pMem->next;
pPool->size -= pMem->size;
taosMemoryFree(pMem);
} while (1);
assert(pPool->size == 0);
}
static void closePool(SPoolMem *pPool) {
clearPool(pPool);
taosMemoryFree(pPool);
}
static void *poolMalloc(void *arg, size_t size) {
void *ptr = NULL;
SPoolMem *pPool = (SPoolMem *)arg;
SPoolMem *pMem;
pMem = (SPoolMem *)taosMemoryMalloc(sizeof(*pMem) + size);
if (pMem == NULL) {
assert(0);
}
pMem->size = sizeof(*pMem) + size;
pMem->next = pPool->next;
pMem->prev = pPool;
pPool->next->prev = pMem;
pPool->next = pMem;
pPool->size += pMem->size;
ptr = (void *)(&pMem[1]);
return ptr;
}
static void poolFree(void *arg, void *ptr) {
SPoolMem *pPool = (SPoolMem *)arg;
SPoolMem *pMem;
pMem = &(((SPoolMem *)ptr)[-1]);
pMem->next->prev = pMem->prev;
pMem->prev->next = pMem->next;
pPool->size -= pMem->size;
taosMemoryFree(pMem);
}
static int tKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
int k1, k2;
std::string s1((char *)pKey1 + 3, kLen1 - 3);
std::string s2((char *)pKey2 + 3, kLen2 - 3);
k1 = stoi(s1);
k2 = stoi(s2);
if (k1 < k2) {
return -1;
} else if (k1 > k2) {
return 1;
} else {
return 0;
}
}
static int tDefaultKeyCmpr(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2) {
int mlen;
int cret;
ASSERT(keyLen1 > 0 && keyLen2 > 0 && pKey1 != NULL && pKey2 != NULL);
mlen = keyLen1 < keyLen2 ? keyLen1 : keyLen2;
cret = memcmp(pKey1, pKey2, mlen);
if (cret == 0) {
if (keyLen1 < keyLen2) {
cret = -1;
} else if (keyLen1 > keyLen2) {
cret = 1;
} else {
cret = 0;
}
}
return cret;
}
TEST(TdbOVFLPagesTest, TbUpsertTest) {
}
TEST(TdbOVFLPagesTest, TbPGetTest) {
}
static void generateBigVal(char *val, int valLen) {
for (int i = 0; i < valLen; ++i) {
char c = char(i & 0xff);
if (c == 0) {
c = 1;
}
val[i] = c;
}
}
static TDB *openEnv(char const *envName, int const pageSize, int const pageNum) {
TDB *pEnv = NULL;
int ret = tdbOpen(envName, pageSize, pageNum, &pEnv);
if (ret) {
pEnv = NULL;
}
return pEnv;
}
static void insertOfp(void) {
int ret = 0;
taosRemoveDir("tdb");
// open Env
int const pageSize = 4096;
int const pageNum = 64;
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
GTEST_ASSERT_NE(pEnv, nullptr);
// open db
TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc = tKeyCmpr;
ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
// open the pool
SPoolMem *pPool = openPool();
// start a transaction
TXN txn;
int64_t txnid = 0;
++txnid;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
// generate value payload
char val[((4083 - 4 - 3 - 2)+1)*100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int valLen = sizeof(val) / sizeof(val[0]);
generateBigVal(val, valLen);
// insert the generated big data
ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, &txn);
GTEST_ASSERT_EQ(ret, 0);
// commit current transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
}
//TEST(TdbOVFLPagesTest, DISABLED_TbInsertTest) {
TEST(TdbOVFLPagesTest, TbInsertTest) {
insertOfp();
}
//TEST(TdbOVFLPagesTest, DISABLED_TbGetTest) {
TEST(TdbOVFLPagesTest, TbGetTest) {
insertOfp();
// open Env
int const pageSize = 4096;
int const pageNum = 64;
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
GTEST_ASSERT_NE(pEnv, nullptr);
// open db
TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc = tKeyCmpr;
int ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
// generate value payload
char val[((4083 - 4 - 3 - 2)+1)*100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int valLen = sizeof(val) / sizeof(val[0]);
generateBigVal(val, valLen);
{ // Query the data
void *pVal = NULL;
int vLen;
ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen);
ASSERT(ret == 0);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(vLen, valLen);
GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0);
tdbFree(pVal);
}
}
TEST(TdbOVFLPagesTest, TbDeleteTest) {
int ret = 0;
taosRemoveDir("tdb");
// open Env
int const pageSize = 4096;
int const pageNum = 64;
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
GTEST_ASSERT_NE(pEnv, nullptr);
// open db
TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc = tKeyCmpr;
ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
// open the pool
SPoolMem *pPool = openPool();
// start a transaction
TXN txn;
int64_t txnid = 0;
++txnid;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
// generate value payload
char val[((4083 - 4 - 3 - 2)+1)*100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int valLen = sizeof(val) / sizeof(val[0]);
generateBigVal(val, valLen);
{ // insert the generated big data
ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, &txn);
GTEST_ASSERT_EQ(ret, 0);
}
{ // query the data
void *pVal = NULL;
int vLen;
ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen);
ASSERT(ret == 0);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(vLen, valLen);
GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0);
tdbFree(pVal);
}
/* open to debug committed file
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
++txnid;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
*/
{ // upsert the data
ret = tdbTbUpsert(pDb, "key1", strlen("key1"), "value1", strlen("value1"), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
{ // query the upserted data
void *pVal = NULL;
int vLen;
ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen);
ASSERT(ret == 0);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(vLen, strlen("value1"));
GTEST_ASSERT_EQ(memcmp("value1", pVal, vLen), 0);
tdbFree(pVal);
}
{ // delete the data
ret = tdbTbDelete(pDb, "key1", strlen("key1"), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
{ // query the deleted data
void *pVal = NULL;
int vLen = -1;
ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen);
ASSERT(ret == -1);
GTEST_ASSERT_EQ(ret, -1);
GTEST_ASSERT_EQ(vLen, -1);
GTEST_ASSERT_EQ(pVal, nullptr);
tdbFree(pVal);
}
// commit current transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
}
TEST(tdb_test, DISABLED_simple_insert1) {
//TEST(tdb_test, simple_insert1) {
int ret;
TDB *pEnv;
TTB *pDb;
tdb_cmpr_fn_t compFunc;
int nData = 1;
TXN txn;
int const pageSize = 4096;
taosRemoveDir("tdb");
// Open Env
ret = tdbOpen("tdb", pageSize, 64, &pEnv);
GTEST_ASSERT_EQ(ret, 0);
// Create a database
compFunc = tKeyCmpr;
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
{
char key[64];
//char val[(4083 - 4 - 3 - 2)]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
char val[(4083 - 4 - 3 - 2)+1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int64_t poolLimit = 4096; // 1M pool limit
int64_t txnid = 0;
SPoolMem *pPool;
// open the pool
pPool = openPool();
// start a transaction
txnid++;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key0");
sprintf(val, "value%d", iData);
//ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
//GTEST_ASSERT_EQ(ret, 0);
// generate value payload
int valLen = sizeof(val) / sizeof(val[0]);
for (int i = 6; i < valLen; ++i) {
char c = char(i & 0xff);
if (c == 0) {
c = 1;
}
val[i] = c;
}
ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, &txn);
GTEST_ASSERT_EQ(ret, 0);
// if pool is full, commit the transaction and start a new one
if (pPool->size >= poolLimit) {
// commit current transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
// start a new transaction
clearPool(pPool);
txnid++;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
}
}
// commit the transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
{ // Query the data
void *pVal = NULL;
int vLen;
for (int i = 1; i <= nData; i++) {
sprintf(key, "key%d", i);
sprintf(val, "value%d", i);
ret = tdbTbGet(pDb, key, strlen(key), &pVal, &vLen);
ASSERT(ret == 0);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(vLen, strlen(val));
GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0);
}
tdbFree(pVal);
}
{ // Iterate to query the DB data
TBC *pDBC;
void *pKey = NULL;
void *pVal = NULL;
int vLen, kLen;
int count = 0;
ret = tdbTbcOpen(pDb, &pDBC, NULL);
GTEST_ASSERT_EQ(ret, 0);
tdbTbcMoveToFirst(pDBC);
for (;;) {
ret = tdbTbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) break;
// std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " ";
// std::cout.write((char *)pVal, vLen) /* << " " << vLen */;
// std::cout << std::endl;
count++;
}
GTEST_ASSERT_EQ(count, nData);
tdbTbcClose(pDBC);
tdbFree(pKey);
tdbFree(pVal);
}
}
ret = tdbTbDrop(pDb);
GTEST_ASSERT_EQ(ret, 0);
// Close a database
tdbTbClose(pDb);
// Close Env
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册