Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
34581781
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
34581781
编写于
11月 09, 2021
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix memory leak
上级
9dae1f31
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
142 addition
and
26 deletion
+142
-26
source/dnode/vnode/tq/inc/tqMetaStore.h
source/dnode/vnode/tq/inc/tqMetaStore.h
+3
-10
source/dnode/vnode/tq/src/tqMetaStore.c
source/dnode/vnode/tq/src/tqMetaStore.c
+23
-16
source/dnode/vnode/tq/test/tqMetaTest.cpp
source/dnode/vnode/tq/test/tqMetaTest.cpp
+116
-0
未找到文件。
source/dnode/vnode/tq/inc/tqMetaStore.h
浏览文件 @
34581781
...
...
@@ -24,7 +24,9 @@
extern
"C"
{
#endif
#define TQ_BUCKET_SIZE 0xFF
#define TQ_BUCKET_MASK 0xFF
#define TQ_BUCKET_SIZE 256
#define TQ_PAGE_SIZE 4096
//key + offset + size
#define TQ_IDX_SIZE 24
...
...
@@ -35,15 +37,6 @@ extern "C" {
//4096 - 4080
#define TQ_IDX_PAGE_HEAD_SIZE 16
inline
static
int
TqMaxEntryOnePage
()
{
//170
return
TQ_PAGE_SIZE
/
TQ_IDX_SIZE
;
}
inline
static
int
TqEmptyTail
()
{
//16
return
TQ_PAGE_SIZE
-
TqMaxEntryOnePage
();
}
#define TQ_ACTION_CONST 0
#define TQ_ACTION_INUSE 1
#define TQ_ACTION_INUSE_CONT 2
...
...
source/dnode/vnode/tq/src/tqMetaStore.c
浏览文件 @
34581781
...
...
@@ -130,7 +130,6 @@ TqMetaStore* tqStoreOpen(const char* path,
pMeta
->
deleter
=
deleter
;
//read idx file and load into memory
/*char idxBuf[TQ_PAGE_SIZE];*/
TqIdxPageBuf
idxBuf
;
TqSerializedHead
*
serializedObj
=
malloc
(
TQ_PAGE_SIZE
);
if
(
serializedObj
==
NULL
)
{
...
...
@@ -196,7 +195,7 @@ TqMetaStore* tqStoreOpen(const char* path,
}
//put into list
int
bucketKey
=
pNode
->
handle
.
key
&
TQ_BUCKET_
SIZE
;
int
bucketKey
=
pNode
->
handle
.
key
&
TQ_BUCKET_
MASK
;
TqMetaList
*
pBucketNode
=
pMeta
->
bucket
[
bucketKey
];
if
(
pBucketNode
==
NULL
)
{
pMeta
->
bucket
[
bucketKey
]
=
pNode
;
...
...
@@ -205,15 +204,18 @@ TqMetaStore* tqStoreOpen(const char* path,
pMeta
->
bucket
[
bucketKey
]
=
pNode
;
}
else
{
while
(
pBucketNode
->
next
&&
pBucketNode
->
next
->
handle
.
key
=
=
pNode
->
handle
.
key
)
{
pBucketNode
->
next
->
handle
.
key
!
=
pNode
->
handle
.
key
)
{
pBucketNode
=
pBucketNode
->
next
;
}
if
(
pBucketNode
->
next
)
{
ASSERT
(
pBucketNode
->
next
->
handle
.
key
==
pNode
->
handle
.
key
);
TqMetaList
*
pNodeTmp
=
pBucketNode
->
next
;
pBucketNode
->
next
=
pNodeTmp
->
next
;
pBucketNode
=
pNodeTmp
;
TqMetaList
*
pNodeFound
=
pBucketNode
->
next
;
pNode
->
next
=
pNodeFound
->
next
;
pBucketNode
->
next
=
pNode
;
pBucketNode
=
pNodeFound
;
}
else
{
pNode
->
next
=
pMeta
->
bucket
[
bucketKey
];
pMeta
->
bucket
[
bucketKey
]
=
pNode
;
pBucketNode
=
NULL
;
}
}
...
...
@@ -359,11 +361,13 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
*
(
bufPtr
++
)
=
pNode
->
handle
.
offset
;
*
(
bufPtr
++
)
=
(
int64_t
)
nBytes
;
idxBuf
.
head
.
writeOffset
+=
TQ_IDX_SIZE
;
if
(
idxBuf
.
head
.
writeOffset
>=
TQ_PAGE_SIZE
)
{
nBytes
=
write
(
pMeta
->
idxFd
,
&
idxBuf
,
TQ_PAGE_SIZE
);
//TODO: handle error with tfile
ASSERT
(
nBytes
==
TQ_PAGE_SIZE
);
memset
(
&
idxBuf
,
0
,
TQ_PAGE_SIZE
);
idxBuf
.
head
.
writeOffset
=
TQ_IDX_PAGE_HEAD_SIZE
;
bufPtr
=
(
int64_t
*
)
&
idxBuf
.
buffer
;
}
//remove from unpersist list
...
...
@@ -376,7 +380,7 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
if
(
pNode
->
handle
.
valueInUse
==
TQ_DELETE_TOKEN
&&
pNode
->
handle
.
valueInTxn
==
NULL
)
{
int
bucketKey
=
pNode
->
handle
.
key
&
TQ_BUCKET_
SIZE
;
int
bucketKey
=
pNode
->
handle
.
key
&
TQ_BUCKET_
MASK
;
TqMetaList
*
pBucketHead
=
pMeta
->
bucket
[
bucketKey
];
if
(
pBucketHead
==
pNode
)
{
pMeta
->
bucket
[
bucketKey
]
=
pNode
->
next
;
...
...
@@ -409,7 +413,7 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
}
static
int32_t
tqHandlePutCommitted
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_
SIZE
;
int64_t
bucketKey
=
key
&
TQ_BUCKET_
MASK
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
...
...
@@ -442,7 +446,7 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value
}
void
*
tqHandleGet
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_
SIZE
;
int64_t
bucketKey
=
key
&
TQ_BUCKET_
MASK
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
...
...
@@ -460,7 +464,7 @@ void* tqHandleGet(TqMetaStore* pMeta, int64_t key) {
}
int32_t
tqHandleMovePut
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_
SIZE
;
int64_t
bucketKey
=
key
&
TQ_BUCKET_
MASK
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
...
...
@@ -498,7 +502,7 @@ int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsi
return
-
1
;
}
memcpy
(
vmem
,
value
,
vsize
);
int64_t
bucketKey
=
key
&
TQ_BUCKET_
SIZE
;
int64_t
bucketKey
=
key
&
TQ_BUCKET_
MASK
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
...
...
@@ -530,7 +534,7 @@ int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsi
}
static
void
*
tqHandleGetUncommitted
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_
SIZE
;
int64_t
bucketKey
=
key
&
TQ_BUCKET_
MASK
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
...
...
@@ -548,10 +552,13 @@ static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) {
}
int32_t
tqHandleCommit
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_
SIZE
;
int64_t
bucketKey
=
key
&
TQ_BUCKET_
MASK
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInTxn
==
NULL
)
{
return
-
1
;
}
if
(
pNode
->
handle
.
valueInUse
&&
pNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
...
...
@@ -564,11 +571,11 @@ int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) {
pNode
=
pNode
->
next
;
}
}
return
-
1
;
return
-
2
;
}
int32_t
tqHandleAbort
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_
SIZE
;
int64_t
bucketKey
=
key
&
TQ_BUCKET_
MASK
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
...
...
@@ -589,7 +596,7 @@ int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) {
}
int32_t
tqHandleDel
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_
SIZE
;
int64_t
bucketKey
=
key
&
TQ_BUCKET_
MASK
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
...
...
source/dnode/vnode/tq/test/tqMetaTest.cpp
浏览文件 @
34581781
...
...
@@ -176,3 +176,119 @@ TEST_F(TqMetaTest, intxnPersist) {
pFoo1
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo1
->
a
,
4
);
}
TEST_F
(
TqMetaTest
,
multiplePage
)
{
srand
(
0
);
std
::
vector
<
int
>
v
;
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
v
.
push_back
(
rand
());
Foo
foo
;
foo
.
a
=
v
[
i
];
tqHandleCopyPut
(
pMeta
,
i
,
&
foo
,
sizeof
(
Foo
));
}
for
(
int
i
=
0
;
i
<
500
;
i
++
)
{
tqHandleCommit
(
pMeta
,
i
);
Foo
*
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
i
);
ASSERT_EQ
(
pFoo
!=
NULL
,
true
)
<<
" at idx "
<<
i
<<
"
\n
"
;
EXPECT_EQ
(
pFoo
->
a
,
v
[
i
]);
}
tqStoreClose
(
pMeta
);
pMeta
=
tqStoreOpen
(
pathName
,
FooSerializer
,
FooDeserializer
,
FooDeleter
);
ASSERT
(
pMeta
);
for
(
int
i
=
500
;
i
<
1000
;
i
++
)
{
tqHandleCommit
(
pMeta
,
i
);
Foo
*
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
i
);
ASSERT_EQ
(
pFoo
!=
NULL
,
true
)
<<
" at idx "
<<
i
<<
"
\n
"
;
EXPECT_EQ
(
pFoo
->
a
,
v
[
i
]);
}
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
Foo
*
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
i
);
ASSERT_EQ
(
pFoo
!=
NULL
,
true
)
<<
" at idx "
<<
i
<<
"
\n
"
;
EXPECT_EQ
(
pFoo
->
a
,
v
[
i
]);
}
}
TEST_F
(
TqMetaTest
,
multipleRewrite
)
{
srand
(
0
);
std
::
vector
<
int
>
v
;
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
v
.
push_back
(
rand
());
Foo
foo
;
foo
.
a
=
v
[
i
];
tqHandleCopyPut
(
pMeta
,
i
,
&
foo
,
sizeof
(
Foo
));
}
for
(
int
i
=
0
;
i
<
500
;
i
++
)
{
tqHandleCommit
(
pMeta
,
i
);
v
[
i
]
=
rand
();
Foo
foo
;
foo
.
a
=
v
[
i
];
tqHandleCopyPut
(
pMeta
,
i
,
&
foo
,
sizeof
(
Foo
));
}
for
(
int
i
=
500
;
i
<
1000
;
i
++
)
{
v
[
i
]
=
rand
();
Foo
foo
;
foo
.
a
=
v
[
i
];
tqHandleCopyPut
(
pMeta
,
i
,
&
foo
,
sizeof
(
Foo
));
}
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
tqHandleCommit
(
pMeta
,
i
);
}
tqStoreClose
(
pMeta
);
pMeta
=
tqStoreOpen
(
pathName
,
FooSerializer
,
FooDeserializer
,
FooDeleter
);
ASSERT
(
pMeta
);
for
(
int
i
=
500
;
i
<
1000
;
i
++
)
{
v
[
i
]
=
rand
();
Foo
foo
;
foo
.
a
=
v
[
i
];
tqHandleCopyPut
(
pMeta
,
i
,
&
foo
,
sizeof
(
Foo
));
tqHandleCommit
(
pMeta
,
i
);
}
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
Foo
*
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
i
);
ASSERT_EQ
(
pFoo
!=
NULL
,
true
)
<<
" at idx "
<<
i
<<
"
\n
"
;
EXPECT_EQ
(
pFoo
->
a
,
v
[
i
]);
}
}
TEST_F
(
TqMetaTest
,
dupCommit
)
{
srand
(
0
);
std
::
vector
<
int
>
v
;
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
v
.
push_back
(
rand
());
Foo
foo
;
foo
.
a
=
v
[
i
];
tqHandleCopyPut
(
pMeta
,
i
,
&
foo
,
sizeof
(
Foo
));
}
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
int
ret
=
tqHandleCommit
(
pMeta
,
i
);
EXPECT_EQ
(
ret
,
0
);
ret
=
tqHandleCommit
(
pMeta
,
i
);
EXPECT_EQ
(
ret
,
-
1
);
}
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
int
ret
=
tqHandleCommit
(
pMeta
,
i
);
EXPECT_EQ
(
ret
,
-
1
);
}
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
Foo
*
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
i
);
ASSERT_EQ
(
pFoo
!=
NULL
,
true
)
<<
" at idx "
<<
i
<<
"
\n
"
;
EXPECT_EQ
(
pFoo
->
a
,
v
[
i
]);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录