未验证 提交 fb81b902 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #12698 from taosdata/fix/hzcheng_3.0

fix: tdb concurrent read/commit
......@@ -214,6 +214,8 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
}
}
pPager->dbOrigSize = pPager->dbFileSize;
// release the page
for (pPage = pPager->pDirty; pPage; pPage = pPager->pDirty) {
pPager->pDirty = pPage->pDirtyNext;
......@@ -230,7 +232,6 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
// remote the journal file
tdbOsClose(pPager->jfd);
tdbOsRemove(pPager->jFileName);
pPager->dbOrigSize = pPager->dbFileSize;
pPager->inTran = 0;
return 0;
......
#include <gtest/gtest.h>
#define ALLOW_FORBID_FUNC
#include "os.h"
#include "tdb.h"
#include <string>
#include <thread>
#include <vector>
typedef struct SPoolMem {
int64_t size;
......@@ -480,4 +483,118 @@ TEST(tdb_test, simple_upsert1) {
tdbTbClose(pDb);
tdbClose(pEnv);
}
TEST(tdb_test, multi_thread_query) {
int ret;
TDB *pEnv;
TTB *pDb;
tdb_cmpr_fn_t compFunc;
int nData = 20000;
TXN txn;
taosRemoveDir("tdb");
// Open Env
ret = tdbOpen("tdb", 512, 1, &pEnv);
GTEST_ASSERT_EQ(ret, 0);
// Create a database
compFunc = tKeyCmpr;
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
char key[64];
char val[64];
int64_t poolLimit = 4096; // 1M pool limit
int64_t txnid = 0;
SPoolMem *pPool;
// open the pool
pPool = openPool();
// start a transaction
txnid++;
txn.flags = TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED;
txn.txnId = -1;
txn.xMalloc = poolMalloc;
txn.xFree = poolFree;
txn.xArg = pPool;
// tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, );
tdbBegin(pEnv, &txn);
for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData);
ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
auto f = [](TTB *pDb, int nData) {
TBC *pDBC;
void *pKey = NULL;
void *pVal = NULL;
int vLen, kLen;
int count = 0;
int ret;
TXN txn;
SPoolMem *pPool = openPool();
txn.flags = 0;
txn.txnId = 0;
txn.xMalloc = poolMalloc;
txn.xFree = poolFree;
txn.xArg = pPool;
ret = tdbTbcOpen(pDb, &pDBC, &txn);
GTEST_ASSERT_EQ(ret, 0);
tdbTbcMoveToFirst(pDBC);
for (;;) {
ret = tdbTbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) break;
// std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " ";
// std::cout.write((char *)pVal, vLen) /* << " " << vLen */;
// std::cout << std::endl;
count++;
}
GTEST_ASSERT_EQ(count, nData);
tdbTbcClose(pDBC);
tdbFree(pKey);
tdbFree(pVal);
};
// tdbCommit(pEnv, &txn);
// multi-thread query
int nThreads = 20;
std::vector<std::thread> threads;
for (int i = 0; i < nThreads; i++) {
if (i == 0) {
threads.push_back(std::thread(tdbCommit, pEnv, &txn));
} else {
threads.push_back(std::thread(f, pDb, nData));
}
}
for (auto &th : threads) {
th.join();
}
// commit the transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
// Close a database
tdbTbClose(pDb);
// Close Env
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册