提交 7c5c1c32 编写于 作者: H Hongze Cheng

implement TDB upsert

上级 8c44ea53
...@@ -42,7 +42,7 @@ int tdbDbClose(TDB *pDb); ...@@ -42,7 +42,7 @@ int tdbDbClose(TDB *pDb);
int tdbDbDrop(TDB *pDb); int tdbDbDrop(TDB *pDb);
int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn); int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn);
int tdbDbDelete(TDB *pDb, const void *pKey, int kLen, TXN *pTxn); int tdbDbDelete(TDB *pDb, const void *pKey, int kLen, TXN *pTxn);
int tdbUpsert(TDB *pTDb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn); int tdbDbUpsert(TDB *pDb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn);
int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen); int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen);
int tdbDbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen); int tdbDbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen);
......
...@@ -138,71 +138,59 @@ int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, in ...@@ -138,71 +138,59 @@ int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, in
} }
if (btc.idx == -1) { if (btc.idx == -1) {
idx = 0; btc.idx = 0;
} else { } else {
if (c > 0) { if (c > 0) {
idx = btc.idx + 1; btc.idx++;
} else if (c < 0) { } else if (c == 0) {
idx = btc.idx; // dup key not allowed
} else {
// TDB does NOT allow same key
tdbBtcClose(&btc);
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
} }
// make sure enough space to hold the cell ret = tdbBtcUpsert(&btc, pKey, kLen, pVal, vLen, 1);
szBuf = kLen + vLen + 14; if (ret < 0) {
pBuf = tdbRealloc(pBt->pBuf, pBt->pageSize > szBuf ? szBuf : pBt->pageSize);
if (pBuf == NULL) {
tdbBtcClose(&btc);
ASSERT(0); ASSERT(0);
tdbBtcClose(&btc);
return -1; return -1;
} }
pBt->pBuf = pBuf;
pCell = (SCell *)pBt->pBuf;
// encode cell tdbBtcClose(&btc);
ret = tdbBtreeEncodeCell(btc.pPage, pKey, kLen, pVal, vLen, pCell, &szCell); return 0;
}
int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn) {
SBTC btc;
int c;
int ret;
tdbBtcOpen(&btc, pBt, pTxn);
// move the cursor
ret = tdbBtcMoveTo(&btc, pKey, kLen, &c);
if (ret < 0) { if (ret < 0) {
tdbBtcClose(&btc); tdbBtcClose(&btc);
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
// mark the page dirty if (btc.idx < 0 || c != 0) {
ret = tdbPagerWrite(pBt->pPager, btc.pPage);
if (ret < 0) {
tdbBtcClose(&btc); tdbBtcClose(&btc);
ASSERT(0);
return -1; return -1;
} }
// insert the cell // delete the key
ret = tdbPageInsertCell(btc.pPage, idx, pCell, szCell, 0); if (tdbBtcDelete(&btc) < 0) {
if (ret < 0) {
tdbBtcClose(&btc); tdbBtcClose(&btc);
ASSERT(0);
return -1; return -1;
} }
// check if need balance
if (btc.pPage->nOverflow > 0) {
ret = tdbBtreeBalance(&btc);
if (ret < 0) {
tdbBtcClose(&btc);
ASSERT(0);
return -1;
}
}
tdbBtcClose(&btc); tdbBtcClose(&btc);
return 0; return 0;
} }
int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn) { int tdbBtreeUpsert(SBTree *pBt, const void *pKey, int nKey, const void *pData, int nData, TXN *pTxn) {
SBTC btc; SBTC btc;
int c; int c;
int ret; int ret;
...@@ -210,20 +198,25 @@ int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn) { ...@@ -210,20 +198,25 @@ int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn) {
tdbBtcOpen(&btc, pBt, pTxn); tdbBtcOpen(&btc, pBt, pTxn);
// move the cursor // move the cursor
ret = tdbBtcMoveTo(&btc, pKey, kLen, &c); ret = tdbBtcMoveTo(&btc, pKey, nKey, &c);
if (ret < 0) { if (ret < 0) {
tdbBtcClose(&btc);
ASSERT(0); ASSERT(0);
tdbBtcClose(&btc);
return -1; return -1;
} }
if (btc.idx < 0 || c != 0) { if (btc.idx == -1) {
tdbBtcClose(&btc); btc.idx = 0;
return -1; c = 1;
} else {
if (c > 0) {
btc.idx = btc.idx + 1;
}
} }
// delete the key ret = tdbBtcUpsert(&btc, pKey, nKey, pData, nData, c);
if (tdbBtcDelete(&btc) < 0) { if (ret < 0) {
ASSERT(0);
tdbBtcClose(&btc); tdbBtcClose(&btc);
return -1; return -1;
} }
...@@ -1475,9 +1468,64 @@ int tdbBtcDelete(SBTC *pBtc) { ...@@ -1475,9 +1468,64 @@ int tdbBtcDelete(SBTC *pBtc) {
return 0; return 0;
} }
int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pVal, int vLen) { int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int nData, int insert) {
ASSERT(0); SCell *pCell;
// TODO int szCell;
int nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
int szBuf;
void *pBuf;
int ret;
ASSERT(pBtc->idx >= 0);
// alloc space
szBuf = kLen + nData + 14;
pBuf = tdbRealloc(pBtc->pBt->pBuf, pBtc->pBt->pageSize > szBuf ? szBuf : pBtc->pBt->pageSize);
if (pBuf == NULL) {
ASSERT(0);
return -1;
}
pBtc->pBt->pBuf = pBuf;
pCell = (SCell *)pBtc->pBt->pBuf;
// encode cell
ret = tdbBtreeEncodeCell(pBtc->pPage, pKey, kLen, pData, nData, pCell, &szCell);
if (ret < 0) {
ASSERT(0);
return -1;
}
// mark dirty
ret = tdbPagerWrite(pBtc->pBt->pPager, pBtc->pPage);
if (ret < 0) {
ASSERT(0);
return -1;
}
// insert or update
if (insert) {
ASSERT(pBtc->idx <= nCells);
ret = tdbPageInsertCell(pBtc->pPage, pBtc->idx, pCell, szCell, 0);
} else {
ASSERT(pBtc->idx < nCells);
ret = tdbPageUpdateCell(pBtc->pPage, pBtc->idx, pCell, szCell);
}
if (ret < 0) {
ASSERT(0);
return -1;
}
// check balance
if (pBtc->pPage->nOverflow > 0) {
ret = tdbBtreeBalance(pBtc);
if (ret < 0) {
ASSERT(0);
return -1;
}
}
return 0; return 0;
} }
......
...@@ -81,9 +81,8 @@ int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int va ...@@ -81,9 +81,8 @@ int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int va
int tdbDbDelete(TDB *pDb, const void *pKey, int kLen, TXN *pTxn) { return tdbBtreeDelete(pDb->pBt, pKey, kLen, pTxn); } int tdbDbDelete(TDB *pDb, const void *pKey, int kLen, TXN *pTxn) { return tdbBtreeDelete(pDb->pBt, pKey, kLen, pTxn); }
int tdbUpsert(TDB *pTDb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn) { int tdbDbUpsert(TDB *pDb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn) {
// TODO return tdbBtreeUpsert(pDb->pBt, pKey, kLen, pVal, vLen, pTxn);
return 0;
} }
int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen) { int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen) {
......
...@@ -129,6 +129,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pFile, tdb_cmpr_fn_t kcmpr, SBT ...@@ -129,6 +129,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pFile, tdb_cmpr_fn_t kcmpr, SBT
int tdbBtreeClose(SBTree *pBt); int tdbBtreeClose(SBTree *pBt);
int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn); int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn);
int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn); int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn);
int tdbBtreeUpsert(SBTree *pBt, const void *pKey, int nKey, const void *pData, int nData, TXN *pTxn);
int tdbBtreeGet(SBTree *pBt, const void *pKey, int kLen, void **ppVal, int *vLen); int tdbBtreeGet(SBTree *pBt, const void *pKey, int kLen, void **ppVal, int *vLen);
int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen); int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen);
...@@ -143,7 +144,7 @@ int tdbBtcMoveToPrev(SBTC *pBtc); ...@@ -143,7 +144,7 @@ int tdbBtcMoveToPrev(SBTC *pBtc);
int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen); int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen);
int tdbBtcGet(SBTC *pBtc, const void **ppKey, int *kLen, const void **ppVal, int *vLen); int tdbBtcGet(SBTC *pBtc, const void **ppKey, int *kLen, const void **ppVal, int *vLen);
int tdbBtcDelete(SBTC *pBtc); int tdbBtcDelete(SBTC *pBtc);
int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pVal, int vLen); int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int nData, int insert);
// tdbPager.c ==================================== // tdbPager.c ====================================
......
...@@ -115,7 +115,7 @@ static int tDefaultKeyCmpr(const void *pKey1, int keyLen1, const void *pKey2, in ...@@ -115,7 +115,7 @@ static int tDefaultKeyCmpr(const void *pKey1, int keyLen1, const void *pKey2, in
return cret; return cret;
} }
TEST(tdb_test, simple_test) { TEST(tdb_test, simple_insert1) {
int ret; int ret;
TENV *pEnv; TENV *pEnv;
TDB *pDb; TDB *pDb;
...@@ -235,7 +235,7 @@ TEST(tdb_test, simple_test) { ...@@ -235,7 +235,7 @@ TEST(tdb_test, simple_test) {
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
} }
TEST(tdb_test, simple_test2) { TEST(tdb_test, simple_insert2) {
int ret; int ret;
TENV *pEnv; TENV *pEnv;
TDB *pDb; TDB *pDb;
...@@ -413,6 +413,71 @@ TEST(tdb_test, simple_delete1) { ...@@ -413,6 +413,71 @@ TEST(tdb_test, simple_delete1) {
closePool(pPool); closePool(pPool);
tdbDbClose(pDb);
tdbEnvClose(pEnv);
}
TEST(tdb_test, simple_upsert1) {
int ret;
TENV *pEnv;
TDB *pDb;
int nData = 100000;
char key[64];
char data[64];
void *pData = NULL;
SPoolMem *pPool;
TXN txn;
taosRemoveDir("tdb");
// open env
ret = tdbEnvOpen("tdb", 4096, 64, &pEnv);
GTEST_ASSERT_EQ(ret, 0);
// open database
ret = tdbDbOpen("db.db", -1, -1, NULL, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
pPool = openPool();
// insert some data
tdbTxnOpen(&txn, 0, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d", iData);
ret = tdbDbInsert(pDb, key, strlen(key), data, strlen(data), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
// query the data
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d", iData);
ret = tdbDbGet(pDb, key, strlen(key), &pData, &nData);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(memcmp(pData, data, nData), 0);
}
// upsert some data
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d-u", iData);
ret = tdbDbUpsert(pDb, key, strlen(key), data, strlen(data), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
tdbCommit(pEnv, &txn);
// query the data
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d-u", iData);
ret = tdbDbGet(pDb, key, strlen(key), &pData, &nData);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(memcmp(pData, data, nData), 0);
}
tdbDbClose(pDb); tdbDbClose(pDb);
tdbEnvClose(pEnv); tdbEnvClose(pEnv);
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册