Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f36d487e
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f36d487e
编写于
11月 02, 2021
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
implement tq meta file store
上级
e42f0280
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
140 addition
and
43 deletion
+140
-43
include/server/vnode/tq/tq.h
include/server/vnode/tq/tq.h
+3
-0
source/server/vnode/tq/inc/tqMetaStore.h
source/server/vnode/tq/inc/tqMetaStore.h
+15
-3
source/server/vnode/tq/src/tq.c
source/server/vnode/tq/src/tq.c
+5
-5
source/server/vnode/tq/src/tqMetaStore.c
source/server/vnode/tq/src/tqMetaStore.c
+117
-35
未找到文件。
include/server/vnode/tq/tq.h
浏览文件 @
f36d487e
...
...
@@ -19,6 +19,9 @@
#include "os.h"
#include "tutil.h"
#define TQ_ACTION_INSERT 0x7f7f7f7fULL
#define TQ_ACTION_DELETE 0x80808080ULL
#ifdef __cplusplus
extern
"C"
{
#endif
...
...
source/server/vnode/tq/inc/tqMetaStore.h
浏览文件 @
f36d487e
...
...
@@ -20,6 +20,17 @@
#include "tq.h"
#define TQ_BUCKET_SIZE 0xFF
#define TQ_PAGE_SIZE 4096
//key + offset + size
#define TQ_IDX_ENTRY_SIZE 24
inline
static
int
TqMaxEntryOnePage
()
{
//170
return
TQ_PAGE_SIZE
/
TQ_IDX_ENTRY_SIZE
;
}
inline
static
int
TqEmptyTail
()
{
//16
return
TQ_PAGE_SIZE
-
TqMaxEntryOnePage
();
}
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -28,8 +39,9 @@ extern "C" {
typedef
struct
TqMetaHandle
{
int64_t
key
;
int64_t
offset
;
void
*
valueInUse
;
void
*
valueInTxn
;
int64_t
serializedSize
;
void
*
valueInUse
;
void
*
valueInTxn
;
}
TqMetaHandle
;
typedef
struct
TqMetaList
{
...
...
@@ -43,7 +55,7 @@ typedef struct TqMetaList {
typedef
struct
TqMetaStore
{
TqMetaList
*
bucket
[
TQ_BUCKET_SIZE
];
//a table head
, key is empty
//a table head
TqMetaList
*
unpersistHead
;
int
fileFd
;
//TODO:temporaral use, to be replaced by unified tfile
int
idxFd
;
//TODO:temporaral use, to be replaced by unified tfile
...
...
source/server/vnode/tq/src/tq.c
浏览文件 @
f36d487e
...
...
@@ -26,12 +26,12 @@ static int tqProtoCheck(TmqMsgHead *pMsg) {
return
pMsg
->
protoVer
==
0
;
}
static
int
tqAckOneTopic
(
TqBufferHandle
*
b
h
andle
,
TmqOneAck
*
pAck
,
TqQueryMsg
**
ppQuery
)
{
static
int
tqAckOneTopic
(
TqBufferHandle
*
b
H
andle
,
TmqOneAck
*
pAck
,
TqQueryMsg
**
ppQuery
)
{
//clean old item and move forward
int32_t
consumeOffset
=
pAck
->
consumeOffset
;
int
idx
=
consumeOffset
%
TQ_BUFFER_SIZE
;
ASSERT
(
b
handle
->
buffer
[
idx
].
content
&&
bh
andle
->
buffer
[
idx
].
executor
);
tfree
(
b
h
andle
->
buffer
[
idx
].
content
);
ASSERT
(
b
Handle
->
buffer
[
idx
].
content
&&
bH
andle
->
buffer
[
idx
].
executor
);
tfree
(
b
H
andle
->
buffer
[
idx
].
content
);
if
(
1
/* TODO: need to launch new query */
)
{
TqQueryMsg
*
pNewQuery
=
malloc
(
sizeof
(
TqQueryMsg
));
if
(
pNewQuery
==
NULL
)
{
...
...
@@ -39,10 +39,10 @@ static int tqAckOneTopic(TqBufferHandle *bhandle, TmqOneAck *pAck, TqQueryMsg**
return
-
1
;
}
//TODO: lock executor
pNewQuery
->
exec
->
executor
=
b
h
andle
->
buffer
[
idx
].
executor
;
pNewQuery
->
exec
->
executor
=
b
H
andle
->
buffer
[
idx
].
executor
;
//TODO: read from wal and assign to src
pNewQuery
->
exec
->
src
=
0
;
pNewQuery
->
exec
->
dest
=
&
b
h
andle
->
buffer
[
idx
];
pNewQuery
->
exec
->
dest
=
&
b
H
andle
->
buffer
[
idx
];
pNewQuery
->
next
=
*
ppQuery
;
*
ppQuery
=
pNewQuery
;
}
...
...
source/server/vnode/tq/src/tqMetaStore.c
浏览文件 @
f36d487e
...
...
@@ -13,12 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tqMetaStore.h"
//TODO:replace by a abstract file layer
//TODO:replace by a
n
abstract file layer
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#define TQ_PAGE_SIZE 4096
#define TQ_META_NAME "tq.meta"
#define TQ_IDX_NAME "tq.idx"
...
...
@@ -35,21 +34,16 @@ TqMetaStore* tqStoreOpen(const char* path,
int
serializer
(
TqGroupHandle
*
,
void
**
),
const
void
*
deserializer
(
const
void
*
,
TqGroupHandle
*
),
void
deleter
(
void
*
))
{
//concat data file name and index file name
size_t
pathLen
=
strlen
(
path
);
char
name
[
pathLen
+
10
];
strcpy
(
name
,
path
);
strcat
(
name
,
"/"
TQ_META_NAME
);
int
fileFd
=
open
(
name
,
O_WRONLY
|
O_CREAT
|
O_EXCL
,
0755
);
if
(
fileFd
<
0
)
return
NULL
;
TqMetaStore
*
pMeta
=
malloc
(
sizeof
(
TqMetaStore
));
if
(
pMeta
==
NULL
)
{
//close
return
NULL
;
}
memset
(
pMeta
,
0
,
sizeof
(
TqMetaStore
));
pMeta
->
fileFd
=
fileFd
;
//concat data file name and index file name
size_t
pathLen
=
strlen
(
path
);
char
name
[
pathLen
+
10
];
strcpy
(
name
,
path
);
strcat
(
name
,
"/"
TQ_IDX_NAME
);
int
idxFd
=
open
(
name
,
O_WRONLY
|
O_CREAT
|
O_EXCL
,
0755
);
...
...
@@ -58,6 +52,7 @@ TqMetaStore* tqStoreOpen(const char* path,
//free memory
return
NULL
;
}
pMeta
->
idxFd
=
idxFd
;
pMeta
->
unpersistHead
=
malloc
(
sizeof
(
TqMetaList
));
if
(
pMeta
->
unpersistHead
==
NULL
)
{
...
...
@@ -65,9 +60,37 @@ TqMetaStore* tqStoreOpen(const char* path,
//free memory
return
NULL
;
}
strcpy
(
name
,
path
);
strcat
(
name
,
"/"
TQ_META_NAME
);
int
fileFd
=
open
(
name
,
O_WRONLY
|
O_CREAT
|
O_EXCL
,
0755
);
if
(
fileFd
<
0
)
return
NULL
;
memset
(
pMeta
,
0
,
sizeof
(
TqMetaStore
));
pMeta
->
fileFd
=
fileFd
;
pMeta
->
serializer
=
serializer
;
pMeta
->
deserializer
=
deserializer
;
pMeta
->
deleter
=
deleter
;
//read idx file and load into memory
char
readBuf
[
TQ_PAGE_SIZE
];
int
readSize
;
while
((
readSize
=
read
(
idxFd
,
readBuf
,
TQ_PAGE_SIZE
))
!=
-
1
)
{
//loop read every entry
for
(
int
i
=
0
;
i
<
readSize
;
i
+=
TQ_IDX_ENTRY_SIZE
)
{
TqMetaList
*
pNode
=
malloc
(
sizeof
(
TqMetaHandle
));
memset
(
pNode
,
0
,
sizeof
(
TqMetaList
));
if
(
pNode
==
NULL
)
{
//TODO: free memory and return error
}
memcpy
(
&
pNode
->
handle
,
&
readBuf
[
i
],
TQ_IDX_ENTRY_SIZE
);
int
bucketKey
=
pNode
->
handle
.
key
&
TQ_BUCKET_SIZE
;
pNode
->
next
=
pMeta
->
bucket
[
bucketKey
];
pMeta
->
bucket
[
bucketKey
]
=
pNode
;
}
}
return
pMeta
;
}
...
...
@@ -106,30 +129,66 @@ int32_t tqStoreDelete(TqMetaStore* pMeta) {
return
0
;
}
//TODO: wrap in tfile
int32_t
tqStorePersist
(
TqMetaStore
*
pMeta
)
{
int64_t
idxBuf
[
3
];
char
writeBuf
[
TQ_PAGE_SIZE
];
int64_t
*
bufPtr
=
(
int64_t
*
)
writeBuf
;
TqMetaList
*
pHead
=
pMeta
->
unpersistHead
;
TqMetaList
*
pNode
=
pHead
->
unpersistNext
;
while
(
pHead
!=
pNode
)
{
ASSERT
(
pNode
->
handle
.
valueInUse
!=
NULL
);
if
(
pNode
->
handle
.
valueInUse
==
NULL
)
{
//put delete token in data file
uint32_t
delete
=
TQ_ACTION_DELETE
;
int
nBytes
=
write
(
pMeta
->
fileFd
,
&
delete
,
sizeof
(
uint32_t
));
ASSERT
(
nBytes
==
sizeof
(
uint32_t
));
//remove from list
int
bucketKey
=
pNode
->
handle
.
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pBucketHead
=
pMeta
->
bucket
[
bucketKey
];
if
(
pBucketHead
==
pNode
)
{
pMeta
->
bucket
[
bucketKey
]
=
pBucketHead
->
next
;
}
else
{
TqMetaList
*
pBucketNode
=
pBucketHead
;
while
(
pBucketNode
->
next
!=
NULL
&&
pBucketNode
->
next
!=
pNode
)
{
pBucketNode
=
pBucketNode
->
next
;
}
if
(
pBucketNode
->
next
!=
NULL
)
{
ASSERT
(
pBucketNode
->
next
==
pNode
);
pBucketNode
->
next
=
pNode
->
next
;
if
(
pNode
->
handle
.
valueInUse
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
}
free
(
pNode
);
}
}
}
//serialize
void
*
pBytes
=
NULL
;
int
sz
=
pMeta
->
serializer
(
pNode
->
handle
.
valueInUse
,
&
pBytes
);
ASSERT
(
pBytes
!=
NULL
);
//get current offset
//append data
int64_t
offset
=
lseek
(
pMeta
->
fileFd
,
0
,
SEEK_CUR
);
int
nBytes
=
write
(
pMeta
->
fileFd
,
pBytes
,
sz
);
//TODO: handle error in tfile
ASSERT
(
nBytes
==
sz
);
pNode
->
handle
.
offset
=
offset
;
pNode
->
handle
.
serializedSize
=
sz
;
//write idx
//TODO: endian check and convert
idxBuf
[
0
]
=
pNode
->
handle
.
key
;
idxBuf
[
1
]
=
pNode
->
handle
.
offset
;
idxBuf
[
2
]
=
(
int64_t
)
sz
;
nBytes
=
write
(
pMeta
->
idxFd
,
idxBuf
,
sizeof
(
idxBuf
));
//TODO: handle error in tfile
ASSERT
(
nBytes
==
sizeof
(
idxBuf
));
*
(
bufPtr
++
)
=
pNode
->
handle
.
key
;
*
(
bufPtr
++
)
=
pNode
->
handle
.
offset
;
*
(
bufPtr
++
)
=
(
int64_t
)
sz
;
if
((
char
*
)(
bufPtr
+
3
)
>
writeBuf
+
TQ_PAGE_SIZE
)
{
nBytes
=
write
(
pMeta
->
idxFd
,
writeBuf
,
sizeof
(
writeBuf
));
//TODO: handle error in tfile
ASSERT
(
nBytes
==
sizeof
(
writeBuf
));
memset
(
writeBuf
,
0
,
TQ_PAGE_SIZE
);
bufPtr
=
(
int64_t
*
)
writeBuf
;
}
//remove from unpersist list
pHead
->
unpersistNext
=
pNode
->
unpersistNext
;
...
...
@@ -138,7 +197,16 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
pNode
->
unpersistPrev
=
pNode
->
unpersistNext
=
NULL
;
pNode
=
pHead
->
unpersistNext
;
}
//TODO:fsync and return upper layer
//write left bytes
if
((
char
*
)
bufPtr
!=
writeBuf
)
{
int
used
=
(
char
*
)
bufPtr
-
writeBuf
;
int
nBytes
=
write
(
pMeta
->
idxFd
,
writeBuf
,
used
);
//TODO: handle error in tfile
ASSERT
(
nBytes
==
used
);
}
//TODO: using fsync in tfile
fsync
(
pMeta
->
idxFd
);
fsync
(
pMeta
->
fileFd
);
return
0
;
}
...
...
@@ -151,10 +219,24 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
//change pointer ownership
pNode
->
handle
.
valueInUse
=
value
;
return
0
;
}
else
{
pNode
=
pNode
->
next
;
}
}
TqMetaList
*
pNewNode
=
malloc
(
sizeof
(
TqMetaList
));
if
(
pNewNode
==
NULL
)
{
//TODO: memory error
return
-
1
;
}
memset
(
pNewNode
,
0
,
sizeof
(
TqMetaList
));
pNewNode
->
handle
.
key
=
key
;
pNewNode
->
handle
.
valueInUse
=
value
;
//put into unpersist list
pNewNode
->
unpersistPrev
=
pMeta
->
unpersistHead
;
pNewNode
->
unpersistNext
=
pMeta
->
unpersistHead
->
unpersistNext
;
pMeta
->
unpersistHead
->
unpersistNext
->
unpersistPrev
=
pNewNode
;
pMeta
->
unpersistHead
->
unpersistNext
=
pNewNode
;
return
0
;
}
...
...
@@ -184,10 +266,19 @@ int32_t tqHandlePut(TqMetaStore* pMeta, int64_t key, void* value) {
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
//change pointer ownership
pNode
->
handle
.
valueInTxn
=
value
;
return
0
;
}
else
{
pNode
=
pNode
->
next
;
}
}
TqMetaList
*
pNewNode
=
malloc
(
sizeof
(
TqMetaList
));
if
(
pNewNode
==
NULL
)
{
//TODO: memory error
return
-
1
;
}
memset
(
pNewNode
,
0
,
sizeof
(
TqMetaList
));
pNewNode
->
handle
.
key
=
key
;
pNewNode
->
handle
.
valueInTxn
=
value
;
return
0
;
}
...
...
@@ -254,24 +345,15 @@ int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) {
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInUse
!=
NULL
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
pNode
->
handle
.
valueInUse
=
NULL
;
//if not in unpersist, put into unpersist
if
(
pNode
->
unpersistNext
==
NULL
)
{
pNode
->
unpersistNext
=
pMeta
->
unpersistHead
->
unpersistNext
;
pNode
->
unpersistPrev
=
pMeta
->
unpersistHead
;
pMeta
->
unpersistHead
->
unpersistNext
->
unpersistPrev
=
pNode
;
pMeta
->
unpersistHead
->
unpersistNext
=
pNode
;
}
return
0
;
}
return
-
1
;
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
pNode
->
handle
.
valueInTxn
=
NULL
;
return
0
;
}
else
{
pNode
=
pNode
->
next
;
}
}
return
-
2
;
//no such key
return
-
1
;
}
int32_t
tqHandleClear
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录