Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f5992851
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f5992851
编写于
11月 04, 2021
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add tqHandleCopyPut interface
上级
e50299ce
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
79 addition
and
7 deletion
+79
-7
source/dnode/vnode/tq/inc/tqMetaStore.h
source/dnode/vnode/tq/inc/tqMetaStore.h
+3
-1
source/dnode/vnode/tq/src/tqMetaStore.c
source/dnode/vnode/tq/src/tqMetaStore.c
+64
-3
source/dnode/vnode/tq/test/tqMetaTest.cpp
source/dnode/vnode/tq/test/tqMetaTest.cpp
+12
-3
未找到文件。
source/dnode/vnode/tq/inc/tqMetaStore.h
浏览文件 @
f5992851
...
...
@@ -60,6 +60,7 @@ typedef struct TqMetaStore {
TqMetaList
*
unpersistHead
;
int
fileFd
;
//TODO:temporaral use, to be replaced by unified tfile
int
idxFd
;
//TODO:temporaral use, to be replaced by unified tfile
char
*
dirPath
;
int
(
*
serializer
)(
const
void
*
pObj
,
void
**
ppBytes
);
const
void
*
(
*
deserializer
)(
const
void
*
pBytes
,
void
**
ppObj
);
void
(
*
deleter
)(
void
*
);
...
...
@@ -75,7 +76,8 @@ int32_t tqStoreClose(TqMetaStore*);
int32_t
tqStorePersist
(
TqMetaStore
*
);
void
*
tqHandleGet
(
TqMetaStore
*
,
int64_t
key
);
int32_t
tqHandlePut
(
TqMetaStore
*
,
int64_t
key
,
void
*
value
);
int32_t
tqHandleMovePut
(
TqMetaStore
*
,
int64_t
key
,
void
*
value
);
int32_t
tqHandleCopyPut
(
TqMetaStore
*
,
int64_t
key
,
void
*
value
,
size_t
vsize
);
//do commit
int32_t
tqHandleCommit
(
TqMetaStore
*
,
int64_t
key
);
//delete uncommitted
...
...
source/dnode/vnode/tq/src/tqMetaStore.c
浏览文件 @
f5992851
...
...
@@ -44,6 +44,12 @@ TqMetaStore* tqStoreOpen(const char* path,
//concat data file name and index file name
size_t
pathLen
=
strlen
(
path
);
pMeta
->
dirPath
=
malloc
(
pathLen
+
1
);
if
(
pMeta
->
dirPath
!=
NULL
)
{
//TODO: memory insufficient
}
strcpy
(
pMeta
->
dirPath
,
path
);
char
name
[
pathLen
+
10
];
strcpy
(
name
,
path
);
...
...
@@ -155,15 +161,35 @@ int32_t tqStoreClose(TqMetaStore* pMeta) {
pNode
=
next
;
}
}
free
(
pMeta
->
dirPath
);
free
(
pMeta
->
unpersistHead
);
free
(
pMeta
);
return
0
;
}
int32_t
tqStoreDelete
(
TqMetaStore
*
pMeta
)
{
//close file
//delete file
close
(
pMeta
->
fileFd
);
close
(
pMeta
->
idxFd
);
//free memory
for
(
int
i
=
0
;
i
<
TQ_BUCKET_SIZE
;
i
++
)
{
TqMetaList
*
pNode
=
pMeta
->
bucket
[
i
];
pMeta
->
bucket
[
i
]
=
NULL
;
while
(
pNode
)
{
if
(
pNode
->
handle
.
valueInTxn
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
}
if
(
pNode
->
handle
.
valueInUse
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
}
TqMetaList
*
next
=
pNode
->
next
;
free
(
pNode
);
pNode
=
next
;
}
}
free
(
pMeta
->
unpersistHead
);
taosRemoveDir
(
pMeta
->
dirPath
);
free
(
pMeta
->
dirPath
);
free
(
pMeta
);
return
0
;
}
...
...
@@ -301,7 +327,7 @@ void* tqHandleGet(TqMetaStore* pMeta, int64_t key) {
return
NULL
;
}
int32_t
tqHandlePut
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
int32_t
tqHandle
Move
Put
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
...
...
@@ -330,6 +356,41 @@ int32_t tqHandlePut(TqMetaStore* pMeta, int64_t key, void* value) {
return
0
;
}
int32_t
tqHandleCopyPut
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
,
size_t
vsize
)
{
void
*
vmem
=
malloc
(
vsize
);
if
(
vmem
==
NULL
)
{
//TODO: memory error
return
-
1
;
}
memcpy
(
vmem
,
value
,
vsize
);
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
//TODO: think about thread safety
if
(
pNode
->
handle
.
valueInTxn
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
}
//change pointer ownership
pNode
->
handle
.
valueInTxn
=
vmem
;
return
0
;
}
else
{
pNode
=
pNode
->
next
;
}
}
TqMetaList
*
pNewNode
=
malloc
(
sizeof
(
TqMetaList
));
if
(
pNewNode
==
NULL
)
{
//TODO: memory error
return
-
1
;
}
memset
(
pNewNode
,
0
,
sizeof
(
TqMetaList
));
pNewNode
->
handle
.
key
=
key
;
pNewNode
->
handle
.
valueInTxn
=
vmem
;
pNewNode
->
next
=
pMeta
->
bucket
[
bucketKey
];
pMeta
->
bucket
[
bucketKey
]
=
pNewNode
;
return
0
;
}
static
void
*
tqHandleGetUncommitted
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
...
...
source/dnode/vnode/tq/test/tqMetaTest.cpp
浏览文件 @
f5992851
...
...
@@ -47,10 +47,19 @@ class TqMetaTest : public ::testing::Test {
const
char
*
pathName
=
"/tmp/tq_test"
;
};
TEST_F
(
TqMetaTest
,
copyPutTest
)
{
Foo
foo
;
foo
.
a
=
3
;
tqHandleCopyPut
(
pMeta
,
1
,
&
foo
,
sizeof
(
Foo
));
Foo
*
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
}
TEST_F
(
TqMetaTest
,
persistTest
)
{
Foo
*
pFoo
=
(
Foo
*
)
malloc
(
sizeof
(
Foo
));
pFoo
->
a
=
2
;
tqHandlePut
(
pMeta
,
1
,
pFoo
);
tqHandle
Move
Put
(
pMeta
,
1
,
pFoo
);
Foo
*
pBar
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pBar
==
NULL
,
true
);
tqHandleCommit
(
pMeta
,
1
);
...
...
@@ -77,7 +86,7 @@ TEST_F(TqMetaTest, persistTest) {
TEST_F
(
TqMetaTest
,
uncommittedTest
)
{
Foo
*
pFoo
=
(
Foo
*
)
malloc
(
sizeof
(
Foo
));
pFoo
->
a
=
3
;
tqHandlePut
(
pMeta
,
1
,
pFoo
);
tqHandle
Move
Put
(
pMeta
,
1
,
pFoo
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
...
...
@@ -86,7 +95,7 @@ TEST_F(TqMetaTest, uncommittedTest) {
TEST_F
(
TqMetaTest
,
abortTest
)
{
Foo
*
pFoo
=
(
Foo
*
)
malloc
(
sizeof
(
Foo
));
pFoo
->
a
=
3
;
tqHandlePut
(
pMeta
,
1
,
pFoo
);
tqHandle
Move
Put
(
pMeta
,
1
,
pFoo
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录