Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
43434732
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
43434732
编写于
11月 05, 2021
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'feature/tq' into 3.0
上级
c5b2a9e0
7c2048e7
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
501 addition
and
122 deletion
+501
-122
include/server/vnode/tq/tq.h
include/server/vnode/tq/tq.h
+0
-3
source/dnode/vnode/tq/CMakeLists.txt
source/dnode/vnode/tq/CMakeLists.txt
+4
-0
source/dnode/vnode/tq/inc/tqMetaStore.h
source/dnode/vnode/tq/inc/tqMetaStore.h
+37
-15
source/dnode/vnode/tq/src/tqMetaStore.c
source/dnode/vnode/tq/src/tqMetaStore.c
+305
-102
source/dnode/vnode/tq/test/CMakeLists.txt
source/dnode/vnode/tq/test/CMakeLists.txt
+20
-0
source/dnode/vnode/tq/test/tqMetaTest.cpp
source/dnode/vnode/tq/test/tqMetaTest.cpp
+133
-0
source/dnode/vnode/tq/test/tqTests.cpp
source/dnode/vnode/tq/test/tqTests.cpp
+0
-0
source/os/src/osDir.c
source/os/src/osDir.c
+2
-2
未找到文件。
include/server/vnode/tq/tq.h
浏览文件 @
43434732
...
@@ -19,9 +19,6 @@
...
@@ -19,9 +19,6 @@
#include "os.h"
#include "os.h"
#include "tutil.h"
#include "tutil.h"
#define TQ_ACTION_INSERT 0x7f7f7f7fULL
#define TQ_ACTION_DELETE 0x80808080ULL
#ifdef __cplusplus
#ifdef __cplusplus
extern
"C"
{
extern
"C"
{
#endif
#endif
...
...
source/dnode/vnode/tq/CMakeLists.txt
浏览文件 @
43434732
...
@@ -12,3 +12,7 @@ target_link_libraries(
...
@@ -12,3 +12,7 @@ target_link_libraries(
PUBLIC os
PUBLIC os
PUBLIC util
PUBLIC util
)
)
if
(
${
BUILD_TEST
}
)
add_subdirectory
(
test
)
endif
(
${
BUILD_TEST
}
)
source/dnode/vnode/tq/inc/tqMetaStore.h
浏览文件 @
43434732
...
@@ -19,6 +19,11 @@
...
@@ -19,6 +19,11 @@
#include "os.h"
#include "os.h"
#include "tq.h"
#include "tq.h"
#ifdef __cplusplus
extern
"C"
{
#endif
#define TQ_BUCKET_SIZE 0xFF
#define TQ_BUCKET_SIZE 0xFF
#define TQ_PAGE_SIZE 4096
#define TQ_PAGE_SIZE 4096
//key + offset + size
//key + offset + size
...
@@ -32,9 +37,23 @@ inline static int TqEmptyTail() { //16
...
@@ -32,9 +37,23 @@ inline static int TqEmptyTail() { //16
return
TQ_PAGE_SIZE
-
TqMaxEntryOnePage
();
return
TQ_PAGE_SIZE
-
TqMaxEntryOnePage
();
}
}
#ifdef __cplusplus
#define TQ_ACTION_CONST 0
extern
"C"
{
#define TQ_ACTION_INUSE 1
#endif
#define TQ_ACTION_INUSE_CONT 2
#define TQ_ACTION_INTXN 3
#define TQ_SVER 0
static
const
int8_t
TQ_CONST_DELETE
=
TQ_ACTION_CONST
;
#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
typedef
struct
TqSerializedHead
{
int16_t
ver
;
int16_t
action
;
int32_t
checksum
;
int64_t
ssize
;
char
content
[];
}
TqSerializedHead
;
typedef
struct
TqMetaHandle
{
typedef
struct
TqMetaHandle
{
int64_t
key
;
int64_t
key
;
...
@@ -59,30 +78,33 @@ typedef struct TqMetaStore {
...
@@ -59,30 +78,33 @@ typedef struct TqMetaStore {
TqMetaList
*
unpersistHead
;
TqMetaList
*
unpersistHead
;
int
fileFd
;
//TODO:temporaral use, to be replaced by unified tfile
int
fileFd
;
//TODO:temporaral use, to be replaced by unified tfile
int
idxFd
;
//TODO:temporaral use, to be replaced by unified tfile
int
idxFd
;
//TODO:temporaral use, to be replaced by unified tfile
int
(
*
serializer
)(
TqGroupHandle
*
,
void
**
);
char
*
dirPath
;
const
void
*
(
*
deserializer
)(
const
void
*
,
TqGroupHandle
*
);
int
(
*
serializer
)(
const
void
*
pObj
,
TqSerializedHead
**
ppHead
);
const
void
*
(
*
deserializer
)(
const
TqSerializedHead
*
pHead
,
void
**
ppObj
);
void
(
*
deleter
)(
void
*
);
void
(
*
deleter
)(
void
*
);
}
TqMetaStore
;
}
TqMetaStore
;
TqMetaStore
*
tqStoreOpen
(
const
char
*
path
,
TqMetaStore
*
tqStoreOpen
(
const
char
*
path
,
int
serializer
(
TqGroupHandle
*
,
void
**
),
int
serializer
(
const
void
*
pObj
,
TqSerializedHead
**
ppHead
),
const
void
*
deserializer
(
const
void
*
,
TqGroupHandle
*
),
const
void
*
deserializer
(
const
TqSerializedHead
*
pHead
,
void
**
ppObj
),
void
deleter
(
void
*
));
void
deleter
(
void
*
pObj
));
int32_t
tqStoreClose
(
TqMetaStore
*
);
int32_t
tqStoreClose
(
TqMetaStore
*
);
//int32_t tqStoreDelete(TqMetaStore*);
//int32_t tqStoreDelete(TqMetaStore*);
//int32_t TqStoreCommitAll(TqMetaStore*);
//int32_t TqStoreCommitAll(TqMetaStore*);
int32_t
tqStorePersist
(
TqMetaStore
*
);
int32_t
tqStorePersist
(
TqMetaStore
*
);
TqMetaHandle
*
tqHandleGet
(
TqMetaStore
*
,
int64_t
key
);
void
*
tqHandleGet
(
TqMetaStore
*
,
int64_t
key
);
int32_t
tqHandlePut
(
TqMetaStore
*
,
int64_t
key
,
void
*
value
);
int32_t
tqHandleMovePut
(
TqMetaStore
*
,
int64_t
key
,
void
*
value
);
int32_t
tqHandleCopyPut
(
TqMetaStore
*
,
int64_t
key
,
void
*
value
,
size_t
vsize
);
//do commit
//do commit
int32_t
tqHandleCommit
(
TqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleCommit
(
TqMetaStore
*
,
int64_t
key
);
//delete uncommitted
//delete uncommitted
int32_t
tqHandleAbort
(
TqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleAbort
(
TqMetaStore
*
,
int64_t
key
);
//delete committed
//delete committed kv pair
int32_t
tqHandleDel
(
TqMetaStore
*
,
int64_t
key
);
//notice that a delete action still needs to be committed
int32_t
tqHandleDel
(
TqMetaStore
*
,
int64_t
key
);
//delete both committed and uncommitted
//delete both committed and uncommitted
int32_t
tqHandleClear
(
TqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleClear
(
TqMetaStore
*
,
int64_t
key
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/dnode/vnode/tq/src/tqMetaStore.c
浏览文件 @
43434732
...
@@ -14,6 +14,7 @@
...
@@ -14,6 +14,7 @@
*/
*/
#include "tqMetaStore.h"
#include "tqMetaStore.h"
//TODO:replace by an abstract file layer
//TODO:replace by an abstract file layer
#include "osDir.h"
#include <fcntl.h>
#include <fcntl.h>
#include <string.h>
#include <string.h>
#include <unistd.h>
#include <unistd.h>
...
@@ -22,8 +23,18 @@
...
@@ -22,8 +23,18 @@
#define TQ_IDX_NAME "tq.idx"
#define TQ_IDX_NAME "tq.idx"
static
int32_t
tqHandlePutCommitted
(
TqMetaStore
*
,
int64_t
key
,
void
*
value
);
static
int32_t
tqHandlePutCommitted
(
TqMetaStore
*
,
int64_t
key
,
void
*
value
);
static
TqMetaHandle
*
tqHandleGetUncommitted
(
TqMetaStore
*
,
int64_t
key
);
static
void
*
tqHandleGetUncommitted
(
TqMetaStore
*
,
int64_t
key
);
static
inline
void
tqLinkUnpersist
(
TqMetaStore
*
pMeta
,
TqMetaList
*
pNode
)
{
if
(
pNode
->
unpersistNext
==
NULL
)
{
pNode
->
unpersistNext
=
pMeta
->
unpersistHead
->
unpersistNext
;
pNode
->
unpersistPrev
=
pMeta
->
unpersistHead
;
pMeta
->
unpersistHead
->
unpersistNext
->
unpersistPrev
=
pNode
;
pMeta
->
unpersistHead
->
unpersistNext
=
pNode
;
}
}
typedef
struct
TqMetaPageBuf
{
typedef
struct
TqMetaPageBuf
{
int16_t
offset
;
int16_t
offset
;
...
@@ -31,23 +42,34 @@ typedef struct TqMetaPageBuf {
...
@@ -31,23 +42,34 @@ typedef struct TqMetaPageBuf {
}
TqMetaPageBuf
;
}
TqMetaPageBuf
;
TqMetaStore
*
tqStoreOpen
(
const
char
*
path
,
TqMetaStore
*
tqStoreOpen
(
const
char
*
path
,
int
serializer
(
TqGroupHandle
*
,
void
**
),
int
serializer
(
const
void
*
pObj
,
TqSerializedHead
**
ppHead
),
const
void
*
deserializer
(
const
void
*
,
TqGroupHandle
*
),
const
void
*
deserializer
(
const
TqSerializedHead
*
pHead
,
void
**
ppObj
),
void
deleter
(
void
*
))
{
void
deleter
(
void
*
pObj
))
{
TqMetaStore
*
pMeta
=
malloc
(
sizeof
(
TqMetaStore
));
TqMetaStore
*
pMeta
=
malloc
(
sizeof
(
TqMetaStore
));
if
(
pMeta
==
NULL
)
{
if
(
pMeta
==
NULL
)
{
//close
//close
return
NULL
;
return
NULL
;
}
}
memset
(
pMeta
,
0
,
sizeof
(
TqMetaStore
));
//concat data file name and index file name
//concat data file name and index file name
size_t
pathLen
=
strlen
(
path
);
size_t
pathLen
=
strlen
(
path
);
pMeta
->
dirPath
=
malloc
(
pathLen
+
1
);
if
(
pMeta
->
dirPath
!=
NULL
)
{
//TODO: memory insufficient
}
strcpy
(
pMeta
->
dirPath
,
path
);
char
name
[
pathLen
+
10
];
char
name
[
pathLen
+
10
];
strcpy
(
name
,
path
);
strcpy
(
name
,
path
);
if
(
!
taosDirExist
(
name
)
&&
!
taosMkDir
(
name
))
{
ASSERT
(
false
);
}
strcat
(
name
,
"/"
TQ_IDX_NAME
);
strcat
(
name
,
"/"
TQ_IDX_NAME
);
int
idxFd
=
open
(
name
,
O_
WRONLY
|
O_CREAT
|
O_EXCL
,
0755
);
int
idxFd
=
open
(
name
,
O_
RDWR
|
O_CREAT
,
0755
);
if
(
idxFd
<
0
)
{
if
(
idxFd
<
0
)
{
ASSERT
(
false
);
//close file
//close file
//free memory
//free memory
return
NULL
;
return
NULL
;
...
@@ -56,17 +78,24 @@ TqMetaStore* tqStoreOpen(const char* path,
...
@@ -56,17 +78,24 @@ TqMetaStore* tqStoreOpen(const char* path,
pMeta
->
idxFd
=
idxFd
;
pMeta
->
idxFd
=
idxFd
;
pMeta
->
unpersistHead
=
malloc
(
sizeof
(
TqMetaList
));
pMeta
->
unpersistHead
=
malloc
(
sizeof
(
TqMetaList
));
if
(
pMeta
->
unpersistHead
==
NULL
)
{
if
(
pMeta
->
unpersistHead
==
NULL
)
{
ASSERT
(
false
);
//close file
//close file
//free memory
//free memory
return
NULL
;
return
NULL
;
}
}
memset
(
pMeta
->
unpersistHead
,
0
,
sizeof
(
TqMetaList
));
pMeta
->
unpersistHead
->
unpersistNext
=
pMeta
->
unpersistHead
->
unpersistPrev
=
pMeta
->
unpersistHead
;
strcpy
(
name
,
path
);
strcpy
(
name
,
path
);
strcat
(
name
,
"/"
TQ_META_NAME
);
strcat
(
name
,
"/"
TQ_META_NAME
);
int
fileFd
=
open
(
name
,
O_WRONLY
|
O_CREAT
|
O_EXCL
,
0755
);
int
fileFd
=
open
(
name
,
O_RDWR
|
O_CREAT
,
0755
);
if
(
fileFd
<
0
)
return
NULL
;
if
(
fileFd
<
0
){
ASSERT
(
false
);
return
NULL
;
}
memset
(
pMeta
,
0
,
sizeof
(
TqMetaStore
));
pMeta
->
fileFd
=
fileFd
;
pMeta
->
fileFd
=
fileFd
;
pMeta
->
serializer
=
serializer
;
pMeta
->
serializer
=
serializer
;
...
@@ -74,23 +103,103 @@ TqMetaStore* tqStoreOpen(const char* path,
...
@@ -74,23 +103,103 @@ TqMetaStore* tqStoreOpen(const char* path,
pMeta
->
deleter
=
deleter
;
pMeta
->
deleter
=
deleter
;
//read idx file and load into memory
//read idx file and load into memory
char
readBuf
[
TQ_PAGE_SIZE
];
char
idxBuf
[
TQ_PAGE_SIZE
];
int
readSize
;
TqSerializedHead
*
serializedObj
=
malloc
(
TQ_PAGE_SIZE
);
while
((
readSize
=
read
(
idxFd
,
readBuf
,
TQ_PAGE_SIZE
))
!=
-
1
)
{
if
(
serializedObj
==
NULL
)
{
//TODO:memory insufficient
}
int
idxRead
;
int
allocated
=
TQ_PAGE_SIZE
;
while
((
idxRead
=
read
(
idxFd
,
idxBuf
,
TQ_PAGE_SIZE
)))
{
if
(
idxRead
==
-
1
)
{
//TODO: handle error
ASSERT
(
false
);
}
//loop read every entry
//loop read every entry
for
(
int
i
=
0
;
i
<
readSize
;
i
+=
TQ_IDX_ENTRY_SIZE
)
{
for
(
int
i
=
0
;
i
<
idxRead
;
i
+=
TQ_IDX_ENTRY_SIZE
)
{
TqMetaList
*
pNode
=
malloc
(
sizeof
(
TqMetaHandle
));
TqMetaList
*
pNode
=
malloc
(
sizeof
(
TqMetaList
));
memset
(
pNode
,
0
,
sizeof
(
TqMetaList
));
if
(
pNode
==
NULL
)
{
if
(
pNode
==
NULL
)
{
//TODO: free memory and return error
//TODO: free memory and return error
}
}
memcpy
(
&
pNode
->
handle
,
&
readBuf
[
i
],
TQ_IDX_ENTRY_SIZE
);
memset
(
pNode
,
0
,
sizeof
(
TqMetaList
));
memcpy
(
&
pNode
->
handle
,
&
idxBuf
[
i
],
TQ_IDX_ENTRY_SIZE
);
lseek
(
fileFd
,
pNode
->
handle
.
offset
,
SEEK_CUR
);
if
(
allocated
<
pNode
->
handle
.
serializedSize
)
{
void
*
ptr
=
realloc
(
serializedObj
,
pNode
->
handle
.
serializedSize
);
if
(
ptr
==
NULL
)
{
//TODO: memory insufficient
}
serializedObj
=
ptr
;
allocated
=
pNode
->
handle
.
serializedSize
;
}
serializedObj
->
ssize
=
pNode
->
handle
.
serializedSize
;
if
(
read
(
fileFd
,
serializedObj
,
pNode
->
handle
.
serializedSize
)
!=
pNode
->
handle
.
serializedSize
)
{
//TODO: read error
}
if
(
serializedObj
->
action
==
TQ_ACTION_INUSE
)
{
if
(
serializedObj
->
ssize
!=
sizeof
(
TqSerializedHead
))
{
pMeta
->
deserializer
(
serializedObj
,
&
pNode
->
handle
.
valueInUse
);
}
else
{
pNode
->
handle
.
valueInUse
=
TQ_DELETE_TOKEN
;
}
}
else
if
(
serializedObj
->
action
==
TQ_ACTION_INTXN
)
{
if
(
serializedObj
->
ssize
!=
sizeof
(
TqSerializedHead
))
{
pMeta
->
deserializer
(
serializedObj
,
&
pNode
->
handle
.
valueInTxn
);
}
else
{
pNode
->
handle
.
valueInTxn
=
TQ_DELETE_TOKEN
;
}
}
else
if
(
serializedObj
->
action
==
TQ_ACTION_INUSE_CONT
)
{
if
(
serializedObj
->
ssize
!=
sizeof
(
TqSerializedHead
))
{
pMeta
->
deserializer
(
serializedObj
,
&
pNode
->
handle
.
valueInUse
);
}
else
{
pNode
->
handle
.
valueInUse
=
TQ_DELETE_TOKEN
;
}
serializedObj
=
POINTER_SHIFT
(
serializedObj
,
serializedObj
->
ssize
);
if
(
serializedObj
->
ssize
!=
sizeof
(
TqSerializedHead
))
{
pMeta
->
deserializer
(
serializedObj
,
&
pNode
->
handle
.
valueInTxn
);
}
else
{
pNode
->
handle
.
valueInTxn
=
TQ_DELETE_TOKEN
;
}
}
else
{
ASSERT
(
0
);
}
//put into list
int
bucketKey
=
pNode
->
handle
.
key
&
TQ_BUCKET_SIZE
;
int
bucketKey
=
pNode
->
handle
.
key
&
TQ_BUCKET_SIZE
;
pNode
->
next
=
pMeta
->
bucket
[
bucketKey
];
TqMetaList
*
pBucketNode
=
pMeta
->
bucket
[
bucketKey
];
pMeta
->
bucket
[
bucketKey
]
=
pNode
;
if
(
pBucketNode
==
NULL
)
{
pMeta
->
bucket
[
bucketKey
]
=
pNode
;
}
else
if
(
pBucketNode
->
handle
.
key
==
pNode
->
handle
.
key
)
{
pNode
->
next
=
pBucketNode
->
next
;
pMeta
->
bucket
[
bucketKey
]
=
pNode
;
}
else
{
while
(
pBucketNode
->
next
&&
pBucketNode
->
next
->
handle
.
key
==
pNode
->
handle
.
key
)
{
pBucketNode
=
pBucketNode
->
next
;
}
if
(
pBucketNode
->
next
)
{
ASSERT
(
pBucketNode
->
next
->
handle
.
key
==
pNode
->
handle
.
key
);
TqMetaList
*
pNodeTmp
=
pBucketNode
->
next
;
pBucketNode
->
next
=
pNodeTmp
->
next
;
pBucketNode
=
pNodeTmp
;
}
else
{
pBucketNode
=
NULL
;
}
}
if
(
pBucketNode
)
{
if
(
pBucketNode
->
handle
.
valueInUse
&&
pBucketNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pBucketNode
->
handle
.
valueInUse
);
}
if
(
pBucketNode
->
handle
.
valueInTxn
&&
pBucketNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pBucketNode
->
handle
.
valueInTxn
);
}
free
(
pBucketNode
);
}
}
}
}
}
free
(
serializedObj
);
return
pMeta
;
return
pMeta
;
}
}
...
@@ -102,30 +211,54 @@ int32_t tqStoreClose(TqMetaStore* pMeta) {
...
@@ -102,30 +211,54 @@ int32_t tqStoreClose(TqMetaStore* pMeta) {
close
(
pMeta
->
idxFd
);
close
(
pMeta
->
idxFd
);
//free memory
//free memory
for
(
int
i
=
0
;
i
<
TQ_BUCKET_SIZE
;
i
++
)
{
for
(
int
i
=
0
;
i
<
TQ_BUCKET_SIZE
;
i
++
)
{
TqMetaList
*
n
ode
=
pMeta
->
bucket
[
i
];
TqMetaList
*
pN
ode
=
pMeta
->
bucket
[
i
];
pMeta
->
bucket
[
i
]
=
NULL
;
while
(
pNode
)
{
while
(
node
)
{
ASSERT
(
pNode
->
unpersistNext
==
NULL
);
ASSERT
(
node
->
unpersistNext
==
NULL
);
ASSERT
(
pNode
->
unpersistPrev
==
NULL
);
ASSERT
(
node
->
unpersistPrev
==
NULL
);
if
(
pNode
->
handle
.
valueInTxn
if
(
node
->
handle
.
valueInTxn
)
{
&&
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
n
ode
->
handle
.
valueInTxn
);
pMeta
->
deleter
(
pN
ode
->
handle
.
valueInTxn
);
}
}
if
(
node
->
handle
.
valueInUse
)
{
if
(
pNode
->
handle
.
valueInUse
pMeta
->
deleter
(
node
->
handle
.
valueInUse
);
&&
pNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
}
}
TqMetaList
*
next
=
n
ode
->
next
;
TqMetaList
*
next
=
pN
ode
->
next
;
free
(
n
ode
);
free
(
pN
ode
);
n
ode
=
next
;
pN
ode
=
next
;
}
}
}
}
free
(
pMeta
->
dirPath
);
free
(
pMeta
->
unpersistHead
);
free
(
pMeta
);
free
(
pMeta
);
return
0
;
return
0
;
}
}
int32_t
tqStoreDelete
(
TqMetaStore
*
pMeta
)
{
int32_t
tqStoreDelete
(
TqMetaStore
*
pMeta
)
{
//close file
close
(
pMeta
->
fileFd
);
//delete file
close
(
pMeta
->
idxFd
);
//free memory
//free memory
for
(
int
i
=
0
;
i
<
TQ_BUCKET_SIZE
;
i
++
)
{
TqMetaList
*
pNode
=
pMeta
->
bucket
[
i
];
pMeta
->
bucket
[
i
]
=
NULL
;
while
(
pNode
)
{
if
(
pNode
->
handle
.
valueInTxn
&&
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
}
if
(
pNode
->
handle
.
valueInUse
&&
pNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
}
TqMetaList
*
next
=
pNode
->
next
;
free
(
pNode
);
pNode
=
next
;
}
}
free
(
pMeta
->
unpersistHead
);
taosRemoveDir
(
pMeta
->
dirPath
);
free
(
pMeta
->
dirPath
);
free
(
pMeta
);
return
0
;
return
0
;
}
}
...
@@ -135,69 +268,89 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
...
@@ -135,69 +268,89 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
int64_t
*
bufPtr
=
(
int64_t
*
)
writeBuf
;
int64_t
*
bufPtr
=
(
int64_t
*
)
writeBuf
;
TqMetaList
*
pHead
=
pMeta
->
unpersistHead
;
TqMetaList
*
pHead
=
pMeta
->
unpersistHead
;
TqMetaList
*
pNode
=
pHead
->
unpersistNext
;
TqMetaList
*
pNode
=
pHead
->
unpersistNext
;
TqSerializedHead
*
pSHead
=
malloc
(
sizeof
(
TqSerializedHead
));
if
(
pSHead
==
NULL
)
{
//TODO: memory error
return
-
1
;
}
pSHead
->
ver
=
TQ_SVER
;
pSHead
->
checksum
=
0
;
pSHead
->
ssize
=
sizeof
(
TqSerializedHead
);
int
allocatedSize
=
sizeof
(
TqSerializedHead
);
int
offset
=
lseek
(
pMeta
->
fileFd
,
0
,
SEEK_CUR
);
while
(
pHead
!=
pNode
)
{
while
(
pHead
!=
pNode
)
{
if
(
pNode
->
handle
.
valueInUse
==
NULL
)
{
int
nBytes
=
0
;
//put delete token in data file
uint32_t
delete
=
TQ_ACTION_DELETE
;
int
nBytes
=
write
(
pMeta
->
fileFd
,
&
delete
,
sizeof
(
uint32_t
));
ASSERT
(
nBytes
==
sizeof
(
uint32_t
));
//remove from list
if
(
pNode
->
handle
.
valueInUse
)
{
int
bucketKey
=
pNode
->
handle
.
key
&
TQ_BUCKET_SIZE
;
if
(
pNode
->
handle
.
valueInTxn
)
{
TqMetaList
*
pBucketHead
=
pMeta
->
bucket
[
bucketKey
];
pSHead
->
action
=
TQ_ACTION_INUSE_CONT
;
if
(
pBucketHead
==
pNode
)
{
pMeta
->
bucket
[
bucketKey
]
=
pBucketHead
->
next
;
}
else
{
}
else
{
TqMetaList
*
pBucketNode
=
pBucketHead
;
pSHead
->
action
=
TQ_ACTION_INUSE
;
while
(
pBucketNode
->
next
!=
NULL
&&
pBucketNode
->
next
!=
pNode
)
{
pBucketNode
=
pBucketNode
->
next
;
}
if
(
pBucketNode
->
next
!=
NULL
)
{
ASSERT
(
pBucketNode
->
next
==
pNode
);
pBucketNode
->
next
=
pNode
->
next
;
if
(
pNode
->
handle
.
valueInUse
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
}
free
(
pNode
);
}
}
}
if
(
pNode
->
handle
.
valueInUse
==
TQ_DELETE_TOKEN
)
{
pSHead
->
ssize
=
sizeof
(
TqSerializedHead
);
}
else
{
pMeta
->
serializer
(
pNode
->
handle
.
valueInUse
,
&
pSHead
);
}
nBytes
=
write
(
pMeta
->
fileFd
,
pSHead
,
pSHead
->
ssize
);
ASSERT
(
nBytes
==
pSHead
->
ssize
);
}
}
//serialize
void
*
pBytes
=
NULL
;
int
sz
=
pMeta
->
serializer
(
pNode
->
handle
.
valueInUse
,
&
pBytes
);
ASSERT
(
pBytes
!=
NULL
);
//get current offset
//append data
int64_t
offset
=
lseek
(
pMeta
->
fileFd
,
0
,
SEEK_CUR
);
int
nBytes
=
write
(
pMeta
->
fileFd
,
pBytes
,
sz
);
//TODO: handle error in tfile
ASSERT
(
nBytes
==
sz
);
pNode
->
handle
.
offset
=
offset
;
if
(
pNode
->
handle
.
valueInTxn
)
{
pNode
->
handle
.
serializedSize
=
sz
;
pSHead
->
action
=
TQ_ACTION_INTXN
;
if
(
pNode
->
handle
.
valueInTxn
==
TQ_DELETE_TOKEN
)
{
pSHead
->
ssize
=
sizeof
(
TqSerializedHead
);
}
else
{
pMeta
->
serializer
(
pNode
->
handle
.
valueInTxn
,
&
pSHead
);
}
int
nBytesTxn
=
write
(
pMeta
->
fileFd
,
pSHead
,
pSHead
->
ssize
);
ASSERT
(
nBytesTxn
==
pSHead
->
ssize
);
nBytes
+=
nBytesTxn
;
}
//write idx
//write idx
file
//TODO: endian check and convert
//TODO: endian check and convert
*
(
bufPtr
++
)
=
pNode
->
handle
.
key
;
*
(
bufPtr
++
)
=
pNode
->
handle
.
key
;
*
(
bufPtr
++
)
=
pNode
->
handle
.
offset
;
*
(
bufPtr
++
)
=
pNode
->
handle
.
offset
;
*
(
bufPtr
++
)
=
(
int64_t
)
sz
;
*
(
bufPtr
++
)
=
(
int64_t
)
nBytes
;
if
((
char
*
)(
bufPtr
+
3
)
>
writeBuf
+
TQ_PAGE_SIZE
)
{
if
((
char
*
)(
bufPtr
+
3
)
>
writeBuf
+
TQ_PAGE_SIZE
)
{
nBytes
=
write
(
pMeta
->
idxFd
,
writeBuf
,
sizeof
(
writeBuf
));
nBytes
=
write
(
pMeta
->
idxFd
,
writeBuf
,
sizeof
(
writeBuf
));
//TODO: handle error
in
tfile
//TODO: handle error
with
tfile
ASSERT
(
nBytes
==
sizeof
(
writeBuf
));
ASSERT
(
nBytes
==
sizeof
(
writeBuf
));
memset
(
writeBuf
,
0
,
TQ_PAGE_SIZE
);
memset
(
writeBuf
,
0
,
TQ_PAGE_SIZE
);
bufPtr
=
(
int64_t
*
)
writeBuf
;
bufPtr
=
(
int64_t
*
)
writeBuf
;
}
}
//remove from unpersist list
//remove from unpersist list
pHead
->
unpersistNext
=
pNode
->
unpersistNext
;
pHead
->
unpersistNext
=
pNode
->
unpersistNext
;
pHead
->
unpersistNext
->
unpersistPrev
=
pHead
;
pHead
->
unpersistNext
->
unpersistPrev
=
pHead
;
pNode
->
unpersistPrev
=
pNode
->
unpersistNext
=
NULL
;
pNode
->
unpersistPrev
=
pNode
->
unpersistNext
=
NULL
;
pNode
=
pHead
->
unpersistNext
;
pNode
=
pHead
->
unpersistNext
;
//remove from bucket
if
(
pNode
->
handle
.
valueInUse
==
TQ_DELETE_TOKEN
&&
pNode
->
handle
.
valueInTxn
==
NULL
)
{
int
bucketKey
=
pNode
->
handle
.
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pBucketHead
=
pMeta
->
bucket
[
bucketKey
];
if
(
pBucketHead
==
pNode
)
{
pMeta
->
bucket
[
bucketKey
]
=
pNode
->
next
;
}
else
{
TqMetaList
*
pBucketNode
=
pBucketHead
;
while
(
pBucketNode
->
next
!=
NULL
&&
pBucketNode
->
next
!=
pNode
)
{
pBucketNode
=
pBucketNode
->
next
;
}
//impossible for pBucket->next == NULL
ASSERT
(
pBucketNode
->
next
==
pNode
);
pBucketNode
->
next
=
pNode
->
next
;
}
free
(
pNode
);
}
}
}
//write left bytes
//write left bytes
free
(
pSHead
);
if
((
char
*
)
bufPtr
!=
writeBuf
)
{
if
((
char
*
)
bufPtr
!=
writeBuf
)
{
int
used
=
(
char
*
)
bufPtr
-
writeBuf
;
int
used
=
(
char
*
)
bufPtr
-
writeBuf
;
int
nBytes
=
write
(
pMeta
->
idxFd
,
writeBuf
,
used
);
int
nBytes
=
write
(
pMeta
->
idxFd
,
writeBuf
,
used
);
...
@@ -216,7 +369,10 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value
...
@@ -216,7 +369,10 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value
while
(
pNode
)
{
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
//TODO: think about thread safety
//TODO: think about thread safety
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
if
(
pNode
->
handle
.
valueInUse
&&
pNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
}
//change pointer ownership
//change pointer ownership
pNode
->
handle
.
valueInUse
=
value
;
pNode
->
handle
.
valueInUse
=
value
;
return
0
;
return
0
;
...
@@ -240,13 +396,13 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value
...
@@ -240,13 +396,13 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value
return
0
;
return
0
;
}
}
TqMetaHandle
*
tqHandleGet
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
void
*
tqHandleGet
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInUse
!=
NULL
)
{
if
(
pNode
->
handle
.
valueInUse
!=
NULL
)
{
return
&
pNode
->
handl
e
;
return
pNode
->
handle
.
valueInUs
e
;
}
else
{
}
else
{
return
NULL
;
return
NULL
;
}
}
...
@@ -257,15 +413,19 @@ TqMetaHandle* tqHandleGet(TqMetaStore* pMeta, int64_t key) {
...
@@ -257,15 +413,19 @@ TqMetaHandle* tqHandleGet(TqMetaStore* pMeta, int64_t key) {
return
NULL
;
return
NULL
;
}
}
int32_t
tqHandlePut
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
int32_t
tqHandle
Move
Put
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
//TODO: think about thread safety
//TODO: think about thread safety
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
if
(
pNode
->
handle
.
valueInTxn
&&
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
}
//change pointer ownership
//change pointer ownership
pNode
->
handle
.
valueInTxn
=
value
;
pNode
->
handle
.
valueInTxn
=
value
;
tqLinkUnpersist
(
pMeta
,
pNode
);
return
0
;
return
0
;
}
else
{
}
else
{
pNode
=
pNode
->
next
;
pNode
=
pNode
->
next
;
...
@@ -279,16 +439,58 @@ int32_t tqHandlePut(TqMetaStore* pMeta, int64_t key, void* value) {
...
@@ -279,16 +439,58 @@ int32_t tqHandlePut(TqMetaStore* pMeta, int64_t key, void* value) {
memset
(
pNewNode
,
0
,
sizeof
(
TqMetaList
));
memset
(
pNewNode
,
0
,
sizeof
(
TqMetaList
));
pNewNode
->
handle
.
key
=
key
;
pNewNode
->
handle
.
key
=
key
;
pNewNode
->
handle
.
valueInTxn
=
value
;
pNewNode
->
handle
.
valueInTxn
=
value
;
pNewNode
->
next
=
pMeta
->
bucket
[
bucketKey
];
pMeta
->
bucket
[
bucketKey
]
=
pNewNode
;
tqLinkUnpersist
(
pMeta
,
pNewNode
);
return
0
;
return
0
;
}
}
static
TqMetaHandle
*
tqHandleGetUncommitted
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int32_t
tqHandleCopyPut
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
,
size_t
vsize
)
{
void
*
vmem
=
malloc
(
vsize
);
if
(
vmem
==
NULL
)
{
//TODO: memory error
return
-
1
;
}
memcpy
(
vmem
,
value
,
vsize
);
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInTxn
!=
NULL
)
{
//TODO: think about thread safety
return
&
pNode
->
handle
;
if
(
pNode
->
handle
.
valueInTxn
&&
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
}
//change pointer ownership
pNode
->
handle
.
valueInTxn
=
vmem
;
tqLinkUnpersist
(
pMeta
,
pNode
);
return
0
;
}
else
{
pNode
=
pNode
->
next
;
}
}
TqMetaList
*
pNewNode
=
malloc
(
sizeof
(
TqMetaList
));
if
(
pNewNode
==
NULL
)
{
//TODO: memory error
return
-
1
;
}
memset
(
pNewNode
,
0
,
sizeof
(
TqMetaList
));
pNewNode
->
handle
.
key
=
key
;
pNewNode
->
handle
.
valueInTxn
=
vmem
;
pNewNode
->
next
=
pMeta
->
bucket
[
bucketKey
];
pMeta
->
bucket
[
bucketKey
]
=
pNewNode
;
tqLinkUnpersist
(
pMeta
,
pNewNode
);
return
0
;
}
static
void
*
tqHandleGetUncommitted
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInTxn
!=
NULL
&&
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
return
pNode
->
handle
.
valueInTxn
;
}
else
{
}
else
{
return
NULL
;
return
NULL
;
}
}
...
@@ -304,16 +506,13 @@ int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) {
...
@@ -304,16 +506,13 @@ int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) {
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInUse
!=
NULL
)
{
if
(
pNode
->
handle
.
valueInUse
&&
pNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
}
}
pNode
->
handle
.
valueInUse
=
pNode
->
handle
.
valueInTxn
;
pNode
->
handle
.
valueInUse
=
pNode
->
handle
.
valueInTxn
;
if
(
pNode
->
unpersistNext
==
NULL
)
{
pNode
->
handle
.
valueInTxn
=
NULL
;
pNode
->
unpersistNext
=
pMeta
->
unpersistHead
->
unpersistNext
;
tqLinkUnpersist
(
pMeta
,
pNode
);
pNode
->
unpersistPrev
=
pMeta
->
unpersistHead
;
pMeta
->
unpersistHead
->
unpersistNext
->
unpersistPrev
=
pNode
;
pMeta
->
unpersistHead
->
unpersistNext
=
pNode
;
}
return
0
;
return
0
;
}
else
{
}
else
{
pNode
=
pNode
->
next
;
pNode
=
pNode
->
next
;
...
@@ -327,9 +526,12 @@ int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) {
...
@@ -327,9 +526,12 @@ int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) {
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInTxn
!=
NULL
)
{
if
(
pNode
->
handle
.
valueInTxn
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
if
(
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
}
pNode
->
handle
.
valueInTxn
=
NULL
;
pNode
->
handle
.
valueInTxn
=
NULL
;
tqLinkUnpersist
(
pMeta
,
pNode
);
return
0
;
return
0
;
}
}
return
-
1
;
return
-
1
;
...
@@ -344,9 +546,11 @@ int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) {
...
@@ -344,9 +546,11 @@ int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) {
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInTxn
&&
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
pNode
->
handle
.
valueInTxn
=
NULL
;
pNode
->
handle
.
valueInTxn
=
TQ_DELETE_TOKEN
;
tqLinkUnpersist
(
pMeta
,
pNode
);
return
0
;
return
0
;
}
else
{
}
else
{
pNode
=
pNode
->
next
;
pNode
=
pNode
->
next
;
...
@@ -364,21 +568,20 @@ int32_t tqHandleClear(TqMetaStore* pMeta, int64_t key) {
...
@@ -364,21 +568,20 @@ int32_t tqHandleClear(TqMetaStore* pMeta, int64_t key) {
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInUse
!=
NULL
)
{
if
(
pNode
->
handle
.
valueInUse
!=
NULL
)
{
exist
=
true
;
exist
=
true
;
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
if
(
pNode
->
handle
.
valueInUse
!=
TQ_DELETE_TOKEN
)
{
pNode
->
handle
.
valueInUse
=
NULL
;
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
}
pNode
->
handle
.
valueInUse
=
TQ_DELETE_TOKEN
;
}
}
if
(
pNode
->
handle
.
valueInTxn
!=
NULL
)
{
if
(
pNode
->
handle
.
valueInTxn
!=
NULL
)
{
exist
=
true
;
exist
=
true
;
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
if
(
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
pNode
->
handle
.
valueInTxn
=
NULL
;
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
}
pNode
->
handle
.
valueInTxn
=
TQ_DELETE_TOKEN
;
}
}
if
(
exist
)
{
if
(
exist
)
{
if
(
pNode
->
unpersistNext
==
NULL
)
{
tqLinkUnpersist
(
pMeta
,
pNode
);
pNode
->
unpersistNext
=
pMeta
->
unpersistHead
->
unpersistNext
;
pNode
->
unpersistPrev
=
pMeta
->
unpersistHead
;
pMeta
->
unpersistHead
->
unpersistNext
->
unpersistPrev
=
pNode
;
pMeta
->
unpersistHead
->
unpersistNext
=
pNode
;
}
return
0
;
return
0
;
}
}
return
-
1
;
return
-
1
;
...
...
source/dnode/vnode/tq/test/CMakeLists.txt
0 → 100644
浏览文件 @
43434732
add_executable
(
tqTest
""
)
target_sources
(
tqTest
PRIVATE
"tqMetaTest.cpp"
)
target_include_directories
(
tqTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/server/vnode/tq"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_link_libraries
(
tqTest
tq
gtest_main
)
enable_testing
()
add_test
(
NAME tq_test
COMMAND tqTest
)
source/dnode/vnode/tq/test/tqMetaTest.cpp
0 → 100644
浏览文件 @
43434732
#include <gtest/gtest.h>
#include <cstring>
#include <iostream>
#include <queue>
#include "tqMetaStore.h"
struct
Foo
{
int32_t
a
;
};
int
FooSerializer
(
const
void
*
pObj
,
TqSerializedHead
**
ppHead
)
{
Foo
*
foo
=
(
Foo
*
)
pObj
;
if
((
*
ppHead
)
==
NULL
||
(
*
ppHead
)
->
ssize
<
sizeof
(
TqSerializedHead
)
+
sizeof
(
int32_t
))
{
*
ppHead
=
(
TqSerializedHead
*
)
realloc
(
*
ppHead
,
sizeof
(
TqSerializedHead
)
+
sizeof
(
int32_t
));
(
*
ppHead
)
->
ssize
=
sizeof
(
TqSerializedHead
)
+
sizeof
(
int32_t
);
}
*
(
int32_t
*
)(
*
ppHead
)
->
content
=
foo
->
a
;
return
(
*
ppHead
)
->
ssize
;
}
const
void
*
FooDeserializer
(
const
TqSerializedHead
*
pHead
,
void
**
ppObj
)
{
if
(
*
ppObj
==
NULL
)
{
*
ppObj
=
realloc
(
*
ppObj
,
sizeof
(
int32_t
));
}
Foo
*
pFoo
=
*
(
Foo
**
)
ppObj
;
pFoo
->
a
=
*
(
int32_t
*
)
pHead
->
content
;
return
NULL
;
}
void
FooDeleter
(
void
*
pObj
)
{
free
(
pObj
);
}
class
TqMetaTest
:
public
::
testing
::
Test
{
protected:
void
SetUp
()
override
{
taosRemoveDir
(
pathName
);
pMeta
=
tqStoreOpen
(
pathName
,
FooSerializer
,
FooDeserializer
,
FooDeleter
);
ASSERT
(
pMeta
);
}
void
TearDown
()
override
{
tqStoreClose
(
pMeta
);
}
TqMetaStore
*
pMeta
;
const
char
*
pathName
=
"/tmp/tq_test"
;
};
TEST_F
(
TqMetaTest
,
copyPutTest
)
{
Foo
foo
;
foo
.
a
=
3
;
tqHandleCopyPut
(
pMeta
,
1
,
&
foo
,
sizeof
(
Foo
));
Foo
*
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
}
TEST_F
(
TqMetaTest
,
persistTest
)
{
Foo
*
pFoo
=
(
Foo
*
)
malloc
(
sizeof
(
Foo
));
pFoo
->
a
=
2
;
tqHandleMovePut
(
pMeta
,
1
,
pFoo
);
Foo
*
pBar
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pBar
==
NULL
,
true
);
tqHandleCommit
(
pMeta
,
1
);
pBar
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pBar
->
a
,
pFoo
->
a
);
pBar
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
2
);
EXPECT_EQ
(
pBar
==
NULL
,
true
);
tqStoreClose
(
pMeta
);
pMeta
=
tqStoreOpen
(
pathName
,
FooSerializer
,
FooDeserializer
,
FooDeleter
);
ASSERT
(
pMeta
);
pBar
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
ASSERT_EQ
(
pBar
!=
NULL
,
true
);
EXPECT_EQ
(
pBar
->
a
,
2
);
pBar
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
2
);
EXPECT_EQ
(
pBar
==
NULL
,
true
);
//taosRemoveDir(pathName);
}
TEST_F
(
TqMetaTest
,
uncommittedTest
)
{
Foo
*
pFoo
=
(
Foo
*
)
malloc
(
sizeof
(
Foo
));
pFoo
->
a
=
3
;
tqHandleMovePut
(
pMeta
,
1
,
pFoo
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
}
TEST_F
(
TqMetaTest
,
abortTest
)
{
Foo
*
pFoo
=
(
Foo
*
)
malloc
(
sizeof
(
Foo
));
pFoo
->
a
=
3
;
tqHandleMovePut
(
pMeta
,
1
,
pFoo
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
tqHandleAbort
(
pMeta
,
1
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
}
TEST_F
(
TqMetaTest
,
deleteTest
)
{
Foo
*
pFoo
=
(
Foo
*
)
malloc
(
sizeof
(
Foo
));
pFoo
->
a
=
3
;
tqHandleMovePut
(
pMeta
,
1
,
pFoo
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
tqHandleCommit
(
pMeta
,
1
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
ASSERT_EQ
(
pFoo
!=
NULL
,
true
);
EXPECT_EQ
(
pFoo
->
a
,
3
);
tqHandleDel
(
pMeta
,
1
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
ASSERT_EQ
(
pFoo
!=
NULL
,
true
);
EXPECT_EQ
(
pFoo
->
a
,
3
);
tqHandleCommit
(
pMeta
,
1
);
pFoo
=
(
Foo
*
)
tqHandleGet
(
pMeta
,
1
);
EXPECT_EQ
(
pFoo
==
NULL
,
true
);
}
source/dnode/vnode/tq/test/tqTests.cpp
已删除
100644 → 0
浏览文件 @
c5b2a9e0
source/os/src/osDir.c
浏览文件 @
43434732
...
@@ -55,7 +55,7 @@ void taosRemoveDir(const char *dirname) {
...
@@ -55,7 +55,7 @@ void taosRemoveDir(const char *dirname) {
closedir
(
dir
);
closedir
(
dir
);
rmdir
(
dirname
);
rmdir
(
dirname
);
printf
(
"dir:%s is removed"
,
dirname
);
printf
(
"dir:%s is removed
\n
"
,
dirname
);
}
}
bool
taosDirExist
(
char
*
dirname
)
{
return
access
(
dirname
,
F_OK
)
==
0
;
}
bool
taosDirExist
(
char
*
dirname
)
{
return
access
(
dirname
,
F_OK
)
==
0
;
}
...
@@ -138,4 +138,4 @@ bool taosRealPath(char *dirname, int32_t maxlen) {
...
@@ -138,4 +138,4 @@ bool taosRealPath(char *dirname, int32_t maxlen) {
return
true
;
return
true
;
}
}
#endif
#endif
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录