Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e4ff74fd
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e4ff74fd
编写于
11月 10, 2021
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add tq meta config
上级
771310d3
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
88 addition
and
53 deletion
+88
-53
source/dnode/vnode/tq/inc/tqMetaStore.h
source/dnode/vnode/tq/inc/tqMetaStore.h
+21
-1
source/dnode/vnode/tq/src/tqMetaStore.c
source/dnode/vnode/tq/src/tqMetaStore.c
+36
-35
source/dnode/vnode/tq/test/tqMetaTest.cpp
source/dnode/vnode/tq/test/tqMetaTest.cpp
+31
-17
未找到文件。
source/dnode/vnode/tq/inc/tqMetaStore.h
浏览文件 @
e4ff74fd
...
...
@@ -44,6 +44,21 @@ extern "C" {
#define TQ_SVER 0
//TODO: inplace mode is not implemented
#define TQ_UPDATE_INPLACE 0
#define TQ_UPDATE_APPEND 1
#define TQ_DUP_INTXN_REWRITE 0
#define TQ_DUP_INTXN_REJECT 2
static
inline
bool
TqUpdateAppend
(
int32_t
tqConfigFlag
)
{
return
tqConfigFlag
&
TQ_UPDATE_APPEND
;
}
static
inline
bool
TqDupIntxnReject
(
int32_t
tqConfigFlag
)
{
return
tqConfigFlag
&
TQ_DUP_INTXN_REJECT
;
}
static
const
int8_t
TQ_CONST_DELETE
=
TQ_ACTION_CONST
;
#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
...
...
@@ -79,6 +94,7 @@ typedef struct TqMetaStore {
int
fileFd
;
//TODO:temporaral use, to be replaced by unified tfile
int
idxFd
;
//TODO:temporaral use, to be replaced by unified tfile
char
*
dirPath
;
int32_t
tqConfigFlag
;
int
(
*
serializer
)(
const
void
*
pObj
,
TqSerializedHead
**
ppHead
);
const
void
*
(
*
deserializer
)(
const
TqSerializedHead
*
pHead
,
void
**
ppObj
);
void
(
*
deleter
)(
void
*
);
...
...
@@ -87,7 +103,9 @@ typedef struct TqMetaStore {
TqMetaStore
*
tqStoreOpen
(
const
char
*
path
,
int
serializer
(
const
void
*
pObj
,
TqSerializedHead
**
ppHead
),
const
void
*
deserializer
(
const
TqSerializedHead
*
pHead
,
void
**
ppObj
),
void
deleter
(
void
*
pObj
));
void
deleter
(
void
*
pObj
),
int32_t
tqConfigFlag
);
int32_t
tqStoreClose
(
TqMetaStore
*
);
//int32_t tqStoreDelete(TqMetaStore*);
//int32_t TqStoreCommitAll(TqMetaStore*);
...
...
@@ -96,6 +114,8 @@ int32_t tqStorePersist(TqMetaStore*);
int32_t
tqStoreCompact
(
TqMetaStore
*
);
void
*
tqHandleGet
(
TqMetaStore
*
,
int64_t
key
);
//make it unpersist
void
*
tqHandleTouchGet
(
TqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleMovePut
(
TqMetaStore
*
,
int64_t
key
,
void
*
value
);
int32_t
tqHandleCopyPut
(
TqMetaStore
*
,
int64_t
key
,
void
*
value
,
size_t
vsize
);
//delete committed kv pair
...
...
source/dnode/vnode/tq/src/tqMetaStore.c
浏览文件 @
e4ff74fd
...
...
@@ -71,7 +71,9 @@ static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) {
TqMetaStore
*
tqStoreOpen
(
const
char
*
path
,
int
serializer
(
const
void
*
pObj
,
TqSerializedHead
**
ppHead
),
const
void
*
deserializer
(
const
TqSerializedHead
*
pHead
,
void
**
ppObj
),
void
deleter
(
void
*
pObj
))
{
void
deleter
(
void
*
pObj
),
int32_t
tqConfigFlag
)
{
TqMetaStore
*
pMeta
=
malloc
(
sizeof
(
TqMetaStore
));
if
(
pMeta
==
NULL
)
{
//close
...
...
@@ -128,6 +130,7 @@ TqMetaStore* tqStoreOpen(const char* path,
pMeta
->
serializer
=
serializer
;
pMeta
->
deserializer
=
deserializer
;
pMeta
->
deleter
=
deleter
;
pMeta
->
tqConfigFlag
=
tqConfigFlag
;
//read idx file and load into memory
TqIdxPageBuf
idxBuf
;
...
...
@@ -463,56 +466,40 @@ void* tqHandleGet(TqMetaStore* pMeta, int64_t key) {
return
NULL
;
}
int32_t
tqHandleMovePut
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
void
*
tqHandleTouchGet
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
//TODO: think about thread safety
if
(
pNode
->
handle
.
valueInTxn
&&
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
if
(
pNode
->
handle
.
valueInUse
!=
NULL
&&
pNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
tqLinkUnpersist
(
pMeta
,
pNode
);
return
pNode
->
handle
.
valueInUse
;
}
else
{
return
NULL
;
}
//change pointer ownership
pNode
->
handle
.
valueInTxn
=
value
;
tqLinkUnpersist
(
pMeta
,
pNode
);
return
0
;
}
else
{
pNode
=
pNode
->
next
;
}
}
TqMetaList
*
pNewNode
=
malloc
(
sizeof
(
TqMetaList
));
if
(
pNewNode
==
NULL
)
{
//TODO: memory error
return
-
1
;
}
memset
(
pNewNode
,
0
,
sizeof
(
TqMetaList
));
pNewNode
->
handle
.
key
=
key
;
pNewNode
->
handle
.
valueInTxn
=
value
;
pNewNode
->
next
=
pMeta
->
bucket
[
bucketKey
];
pMeta
->
bucket
[
bucketKey
]
=
pNewNode
;
tqLinkUnpersist
(
pMeta
,
pNewNode
);
return
0
;
return
NULL
;
}
int32_t
tqHandleCopyPut
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
,
size_t
vsize
)
{
void
*
vmem
=
malloc
(
vsize
);
if
(
vmem
==
NULL
)
{
//TODO: memory error
return
-
1
;
}
memcpy
(
vmem
,
value
,
vsize
);
static
inline
int32_t
tqHandlePutImpl
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
//TODO: think about thread safety
if
(
pNode
->
handle
.
valueInTxn
&&
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
if
(
pNode
->
handle
.
valueInTxn
)
{
if
(
TqDupIntxnReject
(
pMeta
->
tqConfigFlag
))
{
return
-
2
;
}
if
(
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
}
}
//change pointer ownership
pNode
->
handle
.
valueInTxn
=
vmem
;
pNode
->
handle
.
valueInTxn
=
value
;
tqLinkUnpersist
(
pMeta
,
pNode
);
return
0
;
}
else
{
...
...
@@ -526,13 +513,27 @@ int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsi
}
memset
(
pNewNode
,
0
,
sizeof
(
TqMetaList
));
pNewNode
->
handle
.
key
=
key
;
pNewNode
->
handle
.
valueInTxn
=
v
mem
;
pNewNode
->
handle
.
valueInTxn
=
v
alue
;
pNewNode
->
next
=
pMeta
->
bucket
[
bucketKey
];
pMeta
->
bucket
[
bucketKey
]
=
pNewNode
;
tqLinkUnpersist
(
pMeta
,
pNewNode
);
return
0
;
}
int32_t
tqHandleMovePut
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
return
tqHandlePutImpl
(
pMeta
,
key
,
value
);
}
int32_t
tqHandleCopyPut
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
,
size_t
vsize
)
{
void
*
vmem
=
malloc
(
vsize
);
if
(
vmem
==
NULL
)
{
//TODO: memory error
return
-
1
;
}
memcpy
(
vmem
,
value
,
vsize
);
return
tqHandlePutImpl
(
pMeta
,
key
,
vmem
);
}
static
void
*
tqHandleGetUncommitted
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
...
...
source/dnode/vnode/tq/test/tqMetaTest.cpp
浏览文件 @
e4ff74fd
...
...
@@ -32,13 +32,15 @@ void FooDeleter(void* pObj) {
free
(
pObj
);
}
class
TqMetaTest
:
public
::
testing
::
Test
{
class
TqMeta
UpdateAppend
Test
:
public
::
testing
::
Test
{
protected:
void
SetUp
()
override
{
taosRemoveDir
(
pathName
);
pMeta
=
tqStoreOpen
(
pathName
,
FooSerializer
,
FooDeserializer
,
FooDeleter
);
FooSerializer
,
FooDeserializer
,
FooDeleter
,
TQ_UPDATE_APPEND
);
ASSERT
(
pMeta
);
}
...
...
@@ -50,7 +52,7 @@ class TqMetaTest : public ::testing::Test {
const
char
*
pathName
=
"/tmp/tq_test"
;
};
TEST_F
(
TqMetaTest
,
copyPutTest
)
{
TEST_F
(
TqMeta
UpdateAppend
Test
,
copyPutTest
)
{
Foo
foo
;
foo
.
a
=
3
;
tqHandleCopyPut
(
pMeta
,
1
,
&
foo
,
sizeof
(
Foo
));
...
...
@@ -63,7 +65,7 @@ TEST_F(TqMetaTest, copyPutTest) {
EXPECT_EQ
(
pFoo
->
a
,
3
);
}
TEST_F
(
TqMetaTest
,
persistTest
)
{
TEST_F
(
TqMeta
UpdateAppend
Test
,
persistTest
)
{
Foo
*
pFoo
=
(
Foo
*
)
malloc
(
sizeof
(
Foo
));
pFoo
->
a
=
2
;
tqHandleMovePut
(
pMeta
,
1
,
pFoo
);
...
...
@@ -77,7 +79,9 @@ TEST_F(TqMetaTest, persistTest) {
tqStoreClose
(
pMeta
);
pMeta
=
tqStoreOpen
(
pathName
,
FooSerializer
,
FooDeserializer
,
FooDeleter
);
FooSerializer
,
FooDeserializer
,
FooDeleter
,
TQ_UPDATE_APPEND
);
ASSERT
(
pMeta
);
pBar
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
...
...
@@ -88,7 +92,7 @@ TEST_F(TqMetaTest, persistTest) {
EXPECT_EQ
(
pBar
==
NULL
,
true
);
}
TEST_F
(
TqMetaTest
,
uncommittedTest
)
{
TEST_F
(
TqMeta
UpdateAppend
Test
,
uncommittedTest
)
{
Foo
*
pFoo
=
(
Foo
*
)
malloc
(
sizeof
(
Foo
));
pFoo
->
a
=
3
;
tqHandleMovePut
(
pMeta
,
1
,
pFoo
);
...
...
@@ -97,7 +101,7 @@ TEST_F(TqMetaTest, uncommittedTest) {
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
}
TEST_F
(
TqMetaTest
,
abortTest
)
{
TEST_F
(
TqMeta
UpdateAppend
Test
,
abortTest
)
{
Foo
*
pFoo
=
(
Foo
*
)
malloc
(
sizeof
(
Foo
));
pFoo
->
a
=
3
;
tqHandleMovePut
(
pMeta
,
1
,
pFoo
);
...
...
@@ -110,7 +114,7 @@ TEST_F(TqMetaTest, abortTest) {
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
}
TEST_F
(
TqMetaTest
,
deleteTest
)
{
TEST_F
(
TqMeta
UpdateAppend
Test
,
deleteTest
)
{
Foo
*
pFoo
=
(
Foo
*
)
malloc
(
sizeof
(
Foo
));
pFoo
->
a
=
3
;
tqHandleMovePut
(
pMeta
,
1
,
pFoo
);
...
...
@@ -135,14 +139,16 @@ TEST_F(TqMetaTest, deleteTest) {
tqStoreClose
(
pMeta
);
pMeta
=
tqStoreOpen
(
pathName
,
FooSerializer
,
FooDeserializer
,
FooDeleter
);
FooSerializer
,
FooDeserializer
,
FooDeleter
,
TQ_UPDATE_APPEND
);
ASSERT
(
pMeta
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
}
TEST_F
(
TqMetaTest
,
intxnPersist
)
{
TEST_F
(
TqMeta
UpdateAppend
Test
,
intxnPersist
)
{
Foo
*
pFoo
=
(
Foo
*
)
malloc
(
sizeof
(
Foo
));
pFoo
->
a
=
3
;
tqHandleMovePut
(
pMeta
,
1
,
pFoo
);
...
...
@@ -157,7 +163,9 @@ TEST_F(TqMetaTest, intxnPersist) {
tqStoreClose
(
pMeta
);
pMeta
=
tqStoreOpen
(
pathName
,
FooSerializer
,
FooDeserializer
,
FooDeleter
);
FooSerializer
,
FooDeserializer
,
FooDeleter
,
TQ_UPDATE_APPEND
);
ASSERT
(
pMeta
);
pFoo1
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
...
...
@@ -170,14 +178,16 @@ TEST_F(TqMetaTest, intxnPersist) {
tqStoreClose
(
pMeta
);
pMeta
=
tqStoreOpen
(
pathName
,
FooSerializer
,
FooDeserializer
,
FooDeleter
);
FooSerializer
,
FooDeserializer
,
FooDeleter
,
TQ_UPDATE_APPEND
);
ASSERT
(
pMeta
);
pFoo1
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo1
->
a
,
4
);
}
TEST_F
(
TqMetaTest
,
multiplePage
)
{
TEST_F
(
TqMeta
UpdateAppend
Test
,
multiplePage
)
{
srand
(
0
);
std
::
vector
<
int
>
v
;
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
...
...
@@ -195,7 +205,9 @@ TEST_F(TqMetaTest, multiplePage) {
tqStoreClose
(
pMeta
);
pMeta
=
tqStoreOpen
(
pathName
,
FooSerializer
,
FooDeserializer
,
FooDeleter
);
FooSerializer
,
FooDeserializer
,
FooDeleter
,
TQ_UPDATE_APPEND
);
ASSERT
(
pMeta
);
for
(
int
i
=
500
;
i
<
1000
;
i
++
)
{
...
...
@@ -213,7 +225,7 @@ TEST_F(TqMetaTest, multiplePage) {
}
TEST_F
(
TqMetaTest
,
multipleRewrite
)
{
TEST_F
(
TqMeta
UpdateAppend
Test
,
multipleRewrite
)
{
srand
(
0
);
std
::
vector
<
int
>
v
;
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
...
...
@@ -244,7 +256,9 @@ TEST_F(TqMetaTest, multipleRewrite) {
tqStoreClose
(
pMeta
);
pMeta
=
tqStoreOpen
(
pathName
,
FooSerializer
,
FooDeserializer
,
FooDeleter
);
FooSerializer
,
FooDeserializer
,
FooDeleter
,
TQ_UPDATE_APPEND
);
ASSERT
(
pMeta
);
for
(
int
i
=
500
;
i
<
1000
;
i
++
)
{
...
...
@@ -263,7 +277,7 @@ TEST_F(TqMetaTest, multipleRewrite) {
}
TEST_F
(
TqMetaTest
,
dupCommit
)
{
TEST_F
(
TqMeta
UpdateAppend
Test
,
dupCommit
)
{
srand
(
0
);
std
::
vector
<
int
>
v
;
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录