Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
07e54820
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
07e54820
编写于
5月 20, 2022
作者:
H
Hongze Cheng
提交者:
GitHub
5月 20, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12750 from taosdata/fix/hzcheng_3.0
fix:tdb concurrency
上级
292735a1
b563902b
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
60 addition
and
35 deletion
+60
-35
source/libs/tdb/src/db/tdbPCache.c
source/libs/tdb/src/db/tdbPCache.c
+36
-22
source/libs/tdb/src/db/tdbPager.c
source/libs/tdb/src/db/tdbPager.c
+12
-1
source/libs/tdb/src/inc/tdbInt.h
source/libs/tdb/src/inc/tdbInt.h
+7
-7
source/libs/tdb/test/tdbTest.cpp
source/libs/tdb/test/tdbTest.cpp
+5
-5
未找到文件。
source/libs/tdb/src/db/tdbPCache.c
浏览文件 @
07e54820
...
...
@@ -14,6 +14,9 @@
*/
#include "tdbInt.h"
// #include <sys/types.h>
// #include <unistd.h>
struct
SPCache
{
int
szPage
;
int
nPages
;
...
...
@@ -32,7 +35,6 @@ static inline uint32_t tdbPCachePageHash(const SPgid *pPgid) {
uint32_t
*
t
=
(
uint32_t
*
)((
pPgid
)
->
fileid
);
return
(
uint32_t
)(
t
[
0
]
+
t
[
1
]
+
t
[
2
]
+
t
[
3
]
+
t
[
4
]
+
t
[
5
]
+
(
pPgid
)
->
pgno
);
}
#define PAGE_IS_PINNED(pPage) ((pPage)->pLruNext == NULL)
static
int
tdbPCacheOpenImpl
(
SPCache
*
pCache
);
static
SPage
*
tdbPCacheFetchImpl
(
SPCache
*
pCache
,
const
SPgid
*
pPgid
,
TXN
*
pTxn
);
...
...
@@ -80,16 +82,22 @@ int tdbPCacheClose(SPCache *pCache) {
SPage
*
tdbPCacheFetch
(
SPCache
*
pCache
,
const
SPgid
*
pPgid
,
TXN
*
pTxn
)
{
SPage
*
pPage
;
i32
nRef
;
tdbPCacheLock
(
pCache
);
pPage
=
tdbPCacheFetchImpl
(
pCache
,
pPgid
,
pTxn
);
if
(
pPage
)
{
tdbRefPage
(
pPage
);
nRef
=
tdbRefPage
(
pPage
);
}
ASSERT
(
pPage
);
tdbPCacheUnlock
(
pCache
);
// printf("thread %" PRId64 " fetch page %d pgno %d pPage %p nRef %d\n", taosGetSelfPthreadId(), pPage->id,
// TDB_PAGE_PGNO(pPage), pPage, nRef);
return
pPage
;
}
...
...
@@ -98,30 +106,31 @@ void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) {
ASSERT
(
pTxn
);
nRef
=
tdbUnrefPage
(
pPage
);
ASSERT
(
nRef
>=
0
);
//
nRef = tdbUnrefPage(pPage);
//
ASSERT(nRef >= 0);
tdbPCacheLock
(
pCache
);
nRef
=
tdbUnrefPage
(
pPage
);
if
(
nRef
==
0
)
{
tdbPCacheLock
(
pCache
);
// test the nRef again to make sure
// it is safe th handle the page
nRef
=
tdbGetPageRef
(
pPage
);
if
(
nRef
==
0
)
{
if
(
pPage
->
isLocal
)
{
tdbPCacheUnpinPage
(
pCache
,
pPage
);
}
else
{
if
(
TDB_TXN_IS_WRITE
(
pTxn
))
{
// remove from hash
tdbPCacheRemovePageFromHash
(
pCache
,
pPage
);
}
tdbPageDestroy
(
pPage
,
pTxn
->
xFree
,
pTxn
->
xArg
);
// nRef = tdbGetPageRef(pPage);
// if (nRef == 0) {
if
(
pPage
->
isLocal
)
{
tdbPCacheUnpinPage
(
pCache
,
pPage
);
}
else
{
if
(
TDB_TXN_IS_WRITE
(
pTxn
))
{
// remove from hash
tdbPCacheRemovePageFromHash
(
pCache
,
pPage
);
}
}
tdbPCacheUnlock
(
pCache
);
tdbPageDestroy
(
pPage
,
pTxn
->
xFree
,
pTxn
->
xArg
);
}
// }
}
tdbPCacheUnlock
(
pCache
);
// printf("thread %" PRId64 " relas page %d pgno %d pPage %p nRef %d\n", taosGetSelfPthreadId(), pPage->id,
// TDB_PAGE_PGNO(pPage), pPage, nRef);
}
int
tdbPCacheGetPageSize
(
SPCache
*
pCache
)
{
return
pCache
->
szPage
;
}
...
...
@@ -223,6 +232,7 @@ static void tdbPCachePinPage(SPCache *pCache, SPage *pPage) {
pCache
->
nRecyclable
--
;
// printf("pin page %d pgno %d pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
tdbTrace
(
"pin page %d"
,
pPage
->
id
);
}
}
...
...
@@ -243,6 +253,7 @@ static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage) {
pCache
->
nRecyclable
++
;
// printf("unpin page %d pgno %d pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
tdbTrace
(
"unpin page %d"
,
pPage
->
id
);
}
...
...
@@ -253,10 +264,12 @@ static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) {
h
=
tdbPCachePageHash
(
&
(
pPage
->
pgid
));
for
(
ppPage
=
&
(
pCache
->
pgHash
[
h
%
pCache
->
nHash
]);
(
*
ppPage
)
&&
*
ppPage
!=
pPage
;
ppPage
=
&
((
*
ppPage
)
->
pHashNext
))
;
ASSERT
(
*
ppPage
==
pPage
);
*
ppPage
=
pPage
->
pHashNext
;
pCache
->
nPage
--
;
if
(
*
ppPage
)
{
*
ppPage
=
pPage
->
pHashNext
;
pCache
->
nPage
--
;
// printf("rmv page %d to hash, pgno %d, pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
}
tdbTrace
(
"remove page %d to hash"
,
pPage
->
id
);
}
...
...
@@ -271,6 +284,7 @@ static void tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage) {
pCache
->
nPage
++
;
// printf("add page %d to hash, pgno %d, pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
tdbTrace
(
"add page %d to hash"
,
pPage
->
id
);
}
...
...
source/libs/tdb/src/db/tdbPager.c
浏览文件 @
07e54820
...
...
@@ -265,6 +265,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
pgid
.
pgno
=
pgno
;
pPage
=
tdbPCacheFetch
(
pPager
->
pCache
,
&
pgid
,
pTxn
);
if
(
pPage
==
NULL
)
{
ASSERT
(
0
);
return
-
1
;
}
...
...
@@ -272,10 +273,14 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
if
(
!
TDB_PAGE_INITIALIZED
(
pPage
))
{
ret
=
tdbPagerInitPage
(
pPager
,
pPage
,
initPage
,
arg
,
loadPage
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
}
// printf("thread %" PRId64 " pager fetch page %d pgno %d ppage %p\n", taosGetSelfPthreadId(), pPage->id,
// TDB_PAGE_PGNO(pPage), pPage);
ASSERT
(
TDB_PAGE_INITIALIZED
(
pPage
));
ASSERT
(
pPage
->
pPager
==
pPager
);
...
...
@@ -284,7 +289,11 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
return
0
;
}
void
tdbPagerReturnPage
(
SPager
*
pPager
,
SPage
*
pPage
,
TXN
*
pTxn
)
{
tdbPCacheRelease
(
pPager
->
pCache
,
pPage
,
pTxn
);
}
void
tdbPagerReturnPage
(
SPager
*
pPager
,
SPage
*
pPage
,
TXN
*
pTxn
)
{
tdbPCacheRelease
(
pPager
->
pCache
,
pPage
,
pTxn
);
// printf("thread %" PRId64 " pager retun page %d pgno %d ppage %p\n", taosGetSelfPthreadId(), pPage->id,
// TDB_PAGE_PGNO(pPage), pPage);
}
static
int
tdbPagerAllocFreePage
(
SPager
*
pPager
,
SPgno
*
ppgno
)
{
// TODO: Allocate a page from the free list
...
...
@@ -352,6 +361,7 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
ret
=
(
*
initPage
)(
pPage
,
arg
,
init
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
TDB_UNLOCK_PAGE
(
pPage
);
return
-
1
;
}
...
...
@@ -370,6 +380,7 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
}
}
}
else
{
ASSERT
(
0
);
return
-
1
;
}
...
...
source/libs/tdb/src/inc/tdbInt.h
浏览文件 @
07e54820
...
...
@@ -275,15 +275,15 @@ static inline i32 tdbUnrefPage(SPage *pPage) {
#define P_LOCK_FAIL -1
static
inline
int
tdbTryLockPage
(
tdb_spinlock_t
*
pLock
)
{
int
ret
;
if
(
tdbSpinlockTrylock
(
pLock
)
==
0
)
{
ret
=
P_LOCK_SUCC
;
}
else
if
(
errno
==
EBUSY
)
{
ret
=
P_LOCK_BUSY
;
int
ret
=
tdbSpinlockTrylock
(
pLock
)
;
if
(
ret
==
0
)
{
ret
urn
P_LOCK_SUCC
;
}
else
if
(
ret
==
EBUSY
)
{
ret
urn
P_LOCK_BUSY
;
}
else
{
ret
=
P_LOCK_FAIL
;
ASSERT
(
0
);
return
P_LOCK_FAIL
;
}
return
ret
;
}
#define TDB_INIT_PAGE_LOCK(pPage) tdbSpinlockInit(&((pPage)->lock), 0)
...
...
source/libs/tdb/test/tdbTest.cpp
浏览文件 @
07e54820
...
...
@@ -486,18 +486,18 @@ TEST(tdb_test, DISABLED_simple_upsert1) {
tdbClose
(
pEnv
);
}
TEST
(
tdb_test
,
DISABLED_
multi_thread_query
)
{
TEST
(
tdb_test
,
multi_thread_query
)
{
int
ret
;
TDB
*
pEnv
;
TTB
*
pDb
;
tdb_cmpr_fn_t
compFunc
;
int
nData
=
100000
;
int
nData
=
100000
0
;
TXN
txn
;
taosRemoveDir
(
"tdb"
);
// Open Env
ret
=
tdbOpen
(
"tdb"
,
512
,
1
,
&
pEnv
);
ret
=
tdbOpen
(
"tdb"
,
4096
,
10
,
&
pEnv
);
GTEST_ASSERT_EQ
(
ret
,
0
);
// Create a database
...
...
@@ -507,7 +507,7 @@ TEST(tdb_test, DISABLED_multi_thread_query) {
char
key
[
64
];
char
val
[
64
];
int64_t
poolLimit
=
4096
;
// 1M pool limit
int64_t
poolLimit
=
4096
*
20
;
// 1M pool limit
int64_t
txnid
=
0
;
SPoolMem
*
pPool
;
...
...
@@ -600,7 +600,7 @@ TEST(tdb_test, DISABLED_multi_thread_query) {
GTEST_ASSERT_EQ
(
ret
,
0
);
}
TEST
(
tdb_test
,
multi_thread1
)
{
TEST
(
tdb_test
,
DISABLED_
multi_thread1
)
{
#if 0
int ret;
TDB *pDb;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录