Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
6d55ee00
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
6d55ee00
编写于
11月 01, 2021
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
implement tq meta
上级
5294b54c
变更
3
显示空白变更内容
内联
并排
Showing
3 changed file
with
238 addition
and
41 deletion
+238
-41
source/server/vnode/tq/inc/tqMetaStore.h
source/server/vnode/tq/inc/tqMetaStore.h
+19
-14
source/server/vnode/tq/src/tq.c
source/server/vnode/tq/src/tq.c
+13
-13
source/server/vnode/tq/src/tqMetaStore.c
source/server/vnode/tq/src/tqMetaStore.c
+206
-14
未找到文件。
source/server/vnode/tq/inc/tqMetaStore.h
浏览文件 @
6d55ee00
...
...
@@ -17,9 +17,9 @@
#define _TQ_META_STORE_H_
#include "os.h"
#include "tq.h"
#define TQ_INUSE_SIZE 0xFF
#define TQ_PAGE_SIZE 4096
#define TQ_BUCKET_SIZE 0xFF
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -35,34 +35,39 @@ typedef struct TqMetaHandle {
typedef
struct
TqMetaList
{
TqMetaHandle
handle
;
struct
TqMetaList
*
next
;
struct
TqMetaList
*
inTxnPrev
;
struct
TqMetaList
*
inTxnNext
;
//
struct TqMetaList* inTxnPrev;
//
struct TqMetaList* inTxnNext;
struct
TqMetaList
*
unpersistPrev
;
struct
TqMetaList
*
unpersistNext
;
}
TqMetaList
;
typedef
struct
TqMetaStore
{
TqMetaList
*
inUse
[
TQ_INUSE
_SIZE
];
TqMetaList
*
bucket
[
TQ_BUCKET
_SIZE
];
//a table head, key is empty
TqMetaList
*
unpersistHead
;
int
fileFd
;
//TODO:temporaral use
int
idxFd
;
//TODO:temporaral use
void
*
(
*
serializer
)(
void
*
);
void
*
(
*
deserializer
)(
void
*
);
int
fileFd
;
//TODO:temporaral use
, to be replaced by unified tfile
int
idxFd
;
//TODO:temporaral use
, to be replaced by unified tfile
int
(
*
serializer
)(
TqGroupHandle
*
,
void
*
*
);
const
void
*
(
*
deserializer
)(
const
void
*
,
TqGroupHandle
*
);
void
(
*
deleter
)(
void
*
);
}
TqMetaStore
;
TqMetaStore
*
tqStoreOpen
(
const
char
*
path
,
void
*
serializer
(
void
*
),
void
*
deserializer
(
void
*
),
void
deleter
(
void
*
));
TqMetaStore
*
tqStoreOpen
(
const
char
*
path
,
int
serializer
(
TqGroupHandle
*
,
void
**
),
const
void
*
deserializer
(
const
void
*
,
TqGroupHandle
*
),
void
deleter
(
void
*
));
int32_t
tqStoreClose
(
TqMetaStore
*
);
int32_t
tqStoreDelete
(
TqMetaStore
*
);
//
int32_t tqStoreDelete(TqMetaStore*);
//int32_t TqStoreCommitAll(TqMetaStore*);
int32_t
tqStorePersist
(
TqMetaStore
*
);
TqMetaHandle
*
tqHandleGetInUse
(
TqMetaStore
*
,
int64_t
key
);
int32_t
tqHandlePutInUse
(
TqMetaStore
*
,
TqMetaHandle
*
handl
e
);
int32_t
tqHandlePutInUse
(
TqMetaStore
*
,
int64_t
key
,
void
*
valu
e
);
TqMetaHandle
*
tqHandleGetInTxn
(
TqMetaStore
*
,
int64_t
key
);
int32_t
tqHandlePutInTxn
(
TqMetaStore
*
,
TqMetaHandle
*
handle
);
//delete in-use-handle, make in-txn-handle in use
int32_t
tqHandlePutInTxn
(
TqMetaStore
*
,
int64_t
key
,
void
*
value
);
//will replace old handle
//int32_t tqHandlePut(TqMetaStore*, TqMetaHandle* handle);
//delete in-use-handle, and put it in use
int32_t
tqHandleCommit
(
TqMetaStore
*
,
int64_t
key
);
//delete in-txn-handle
int32_t
tqHandleAbort
(
TqMetaStore
*
,
int64_t
key
);
...
...
source/server/vnode/tq/src/tq.c
浏览文件 @
6d55ee00
...
...
@@ -49,12 +49,12 @@ static int tqAckOneTopic(TqBufferHandle *bhandle, TmqOneAck *pAck, TqQueryMsg**
return
0
;
}
static
int
tqAck
(
TqGroupHandle
*
g
h
andle
,
TmqAcks
*
pAcks
)
{
static
int
tqAck
(
TqGroupHandle
*
g
H
andle
,
TmqAcks
*
pAcks
)
{
int32_t
ackNum
=
pAcks
->
ackNum
;
TmqOneAck
*
acks
=
pAcks
->
acks
;
//double ptr for acks and list
int
i
=
0
;
TqListHandle
*
node
=
g
h
andle
->
head
;
TqListHandle
*
node
=
g
H
andle
->
head
;
int
ackCnt
=
0
;
TqQueryMsg
*
pQuery
=
NULL
;
while
(
i
<
ackNum
&&
node
->
next
)
{
...
...
@@ -99,8 +99,8 @@ int tqDropTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
return
0
;
}
static
int
tqFetch
(
TqGroupHandle
*
g
h
andle
,
void
**
msg
)
{
TqListHandle
*
head
=
g
h
andle
->
head
;
static
int
tqFetch
(
TqGroupHandle
*
g
H
andle
,
void
**
msg
)
{
TqListHandle
*
head
=
g
H
andle
->
head
;
TqListHandle
*
node
=
head
;
int
totSize
=
0
;
//TODO: make it a macro
...
...
@@ -148,7 +148,7 @@ TqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) {
return
NULL
;
}
int
tqLaunchQuery
(
TqGroupHandle
*
g
h
andle
)
{
int
tqLaunchQuery
(
TqGroupHandle
*
g
H
andle
)
{
return
0
;
}
...
...
@@ -156,7 +156,7 @@ int tqSendLaunchQuery(TqGroupHandle* gHandle) {
return
0
;
}
/*int tqMoveOffsetToNext(TqGroupHandle* g
h
andle) {*/
/*int tqMoveOffsetToNext(TqGroupHandle* g
H
andle) {*/
/*return 0;*/
/*}*/
...
...
@@ -177,13 +177,13 @@ int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) {
return
-
1
;
}
int64_t
clientId
=
pMsg
->
head
.
clientId
;
TqGroupHandle
*
g
h
andle
=
tqGetGroupHandle
(
pTq
,
clientId
);
if
(
g
h
andle
==
NULL
)
{
TqGroupHandle
*
g
H
andle
=
tqGetGroupHandle
(
pTq
,
clientId
);
if
(
g
H
andle
==
NULL
)
{
//client not connect
return
-
1
;
}
if
(
pMsg
->
acks
.
ackNum
!=
0
)
{
if
(
tqAck
(
g
h
andle
,
&
pMsg
->
acks
)
!=
0
)
{
if
(
tqAck
(
g
H
andle
,
&
pMsg
->
acks
)
!=
0
)
{
//ack not success
return
-
1
;
}
...
...
@@ -191,13 +191,13 @@ int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) {
TmqConsumeRsp
*
pRsp
=
(
TmqConsumeRsp
*
)
pMsg
;
if
(
tqFetch
(
g
h
andle
,
(
void
**
)
&
pRsp
->
msgs
)
<=
0
)
{
if
(
tqFetch
(
g
H
andle
,
(
void
**
)
&
pRsp
->
msgs
)
<=
0
)
{
//fetch error
return
-
1
;
}
//judge and launch new query
if
(
tqLaunchQuery
(
g
h
andle
))
{
if
(
tqLaunchQuery
(
g
H
andle
))
{
//launch query error
return
-
1
;
}
...
...
@@ -206,7 +206,7 @@ int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) {
int
tqSerializeGroupHandle
(
TqGroupHandle
*
gHandle
,
void
**
ppBytes
)
{
//calculate size
int
sz
=
tqGet
G
HandleSSize
(
gHandle
);
int
sz
=
tqGet
g
HandleSSize
(
gHandle
);
void
*
ptr
=
realloc
(
*
ppBytes
,
sz
);
if
(
ptr
==
NULL
)
{
free
(
ppBytes
);
...
...
@@ -313,7 +313,7 @@ const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem) {
}
//TODO: make this a macro
int
tqGet
G
HandleSSize
(
const
TqGroupHandle
*
gHandle
)
{
int
tqGet
g
HandleSSize
(
const
TqGroupHandle
*
gHandle
)
{
return
sizeof
(
int64_t
)
*
2
+
sizeof
(
int32_t
)
+
gHandle
->
topicNum
*
tqBufHandleSSize
();
...
...
source/server/vnode/tq/src/tqMetaStore.c
浏览文件 @
6d55ee00
...
...
@@ -15,17 +15,28 @@
#include "tqMetaStore.h"
//TODO:replace by a abstract file layer
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#define TQ_PAGE_SIZE 4096
#define TQ_META_NAME "tq.meta"
#define TQ_IDX_NAME "tq.idx"
typedef
struct
TqMetaPageBuf
{
int16_t
offset
;
char
buffer
[
TQ_PAGE_SIZE
];
}
TqMetaPageBuf
;
TqMetaStore
*
tqStoreOpen
(
const
char
*
path
,
void
*
serializer
(
void
*
),
void
*
deserializer
(
void
*
),
void
deleter
(
void
*
))
{
TqMetaStore
*
tqStoreOpen
(
const
char
*
path
,
int
serializer
(
TqGroupHandle
*
,
void
**
),
const
void
*
deserializer
(
const
void
*
,
TqGroupHandle
*
),
void
deleter
(
void
*
))
{
//concat data file name and index file name
int
fileFd
=
open
(
path
,
O_WRONLY
|
O_CREAT
|
O_EXCL
,
0755
);
size_t
pathLen
=
strlen
(
path
);
char
name
[
pathLen
+
10
];
strcpy
(
name
,
path
);
strcat
(
name
,
"/"
TQ_META_NAME
);
int
fileFd
=
open
(
name
,
O_WRONLY
|
O_CREAT
|
O_EXCL
,
0755
);
if
(
fileFd
<
0
)
return
NULL
;
TqMetaStore
*
pMeta
=
malloc
(
sizeof
(
TqMetaStore
));
if
(
pMeta
==
NULL
)
{
...
...
@@ -35,7 +46,9 @@ TqMetaStore* tqStoreOpen(const char* path, void* serializer(void*),
memset
(
pMeta
,
0
,
sizeof
(
TqMetaStore
));
pMeta
->
fileFd
=
fileFd
;
int
idxFd
=
open
(
path
,
O_WRONLY
|
O_CREAT
|
O_EXCL
,
0755
);
strcpy
(
name
,
path
);
strcat
(
name
,
"/"
TQ_IDX_NAME
);
int
idxFd
=
open
(
name
,
O_WRONLY
|
O_CREAT
|
O_EXCL
,
0755
);
if
(
idxFd
<
0
)
{
//close file
//free memory
...
...
@@ -56,8 +69,29 @@ TqMetaStore* tqStoreOpen(const char* path, void* serializer(void*),
int32_t
tqStoreClose
(
TqMetaStore
*
pMeta
)
{
//commit data and idx
//close file
tqStorePersist
(
pMeta
);
ASSERT
(
pMeta
->
unpersistHead
&&
pMeta
->
unpersistHead
->
next
==
NULL
);
close
(
pMeta
->
fileFd
);
close
(
pMeta
->
idxFd
);
//free memory
for
(
int
i
=
0
;
i
<
TQ_BUCKET_SIZE
;
i
++
)
{
TqMetaList
*
node
=
pMeta
->
bucket
[
i
];
pMeta
->
bucket
[
i
]
=
NULL
;
while
(
node
)
{
ASSERT
(
node
->
unpersistNext
==
NULL
);
ASSERT
(
node
->
unpersistPrev
==
NULL
);
if
(
node
->
handle
.
valueInTxn
)
{
pMeta
->
deleter
(
node
->
handle
.
valueInTxn
);
}
if
(
node
->
handle
.
valueInUse
)
{
pMeta
->
deleter
(
node
->
handle
.
valueInUse
);
}
TqMetaList
*
next
=
node
->
next
;
free
(
node
);
node
=
next
;
}
}
free
(
pMeta
);
return
0
;
}
...
...
@@ -69,44 +103,202 @@ int32_t tqStoreDelete(TqMetaStore* pMeta) {
}
int32_t
tqStorePersist
(
TqMetaStore
*
pMeta
)
{
TqMetaList
*
node
=
pMeta
->
unpersistHead
;
while
(
node
->
unpersistNext
)
{
int64_t
idxBuf
[
3
];
TqMetaList
*
pHead
=
pMeta
->
unpersistHead
;
TqMetaList
*
pNode
=
pHead
->
unpersistNext
;
while
(
pHead
!=
pNode
)
{
ASSERT
(
pNode
->
handle
.
valueInUse
!=
NULL
);
//serialize
void
*
pBytes
=
NULL
;
int
sz
=
pMeta
->
serializer
(
pNode
->
handle
.
valueInUse
,
&
pBytes
);
ASSERT
(
pBytes
!=
NULL
);
//get current offset
//append data
//write offset and idx
int
nBytes
=
write
(
pMeta
->
fileFd
,
pBytes
,
sz
);
//TODO: handle error in tfile
ASSERT
(
nBytes
==
sz
);
//write idx
//TODO: endian check and convert
idxBuf
[
0
]
=
pNode
->
handle
.
key
;
idxBuf
[
1
]
=
pNode
->
handle
.
offset
;
idxBuf
[
2
]
=
(
int64_t
)
sz
;
nBytes
=
write
(
pMeta
->
idxFd
,
idxBuf
,
sizeof
(
idxBuf
));
//TODO: handle error in tfile
ASSERT
(
nBytes
==
sizeof
(
idxBuf
));
//remove from unpersist list
pHead
->
unpersistNext
=
pNode
->
unpersistNext
;
pHead
->
unpersistNext
->
unpersistPrev
=
pHead
;
pNode
->
unpersistPrev
=
pNode
->
unpersistNext
=
NULL
;
pNode
=
pHead
->
unpersistNext
;
}
//TODO:fsync and return upper layer
return
0
;
}
int32_t
tqHandlePutInUse
(
TqMetaStore
*
pMeta
,
TqMetaHandle
*
handle
)
{
int32_t
tqHandlePutInUse
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
//TODO: think about thread safety
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
//change pointer ownership
pNode
->
handle
.
valueInUse
=
value
;
}
else
{
pNode
=
pNode
->
next
;
}
}
return
0
;
}
TqMetaHandle
*
tqHandleGetInUse
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInUse
!=
NULL
)
{
return
&
pNode
->
handle
;
}
else
{
return
NULL
;
}
}
else
{
pNode
=
pNode
->
next
;
}
}
return
NULL
;
}
int32_t
tqHandlePutInTxn
(
TqMetaStore
*
pMeta
,
TqMetaHandle
*
handle
)
{
int32_t
tqHandlePutInTxn
(
TqMetaStore
*
pMeta
,
int64_t
key
,
void
*
value
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
//TODO: think about thread safety
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
//change pointer ownership
pNode
->
handle
.
valueInTxn
=
value
;
}
else
{
pNode
=
pNode
->
next
;
}
}
return
0
;
}
TqMetaHandle
*
tqHandleGetInTxn
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInTxn
!=
NULL
)
{
return
&
pNode
->
handle
;
}
else
{
return
NULL
;
}
}
else
{
pNode
=
pNode
->
next
;
}
}
return
NULL
;
}
int32_t
tqHandleCommit
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInUse
!=
NULL
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
}
pNode
->
handle
.
valueInUse
=
pNode
->
handle
.
valueInTxn
;
if
(
pNode
->
unpersistNext
==
NULL
)
{
pNode
->
unpersistNext
=
pMeta
->
unpersistHead
->
unpersistNext
;
pNode
->
unpersistPrev
=
pMeta
->
unpersistHead
;
pMeta
->
unpersistHead
->
unpersistNext
->
unpersistPrev
=
pNode
;
pMeta
->
unpersistHead
->
unpersistNext
=
pNode
;
}
return
0
;
}
else
{
pNode
=
pNode
->
next
;
}
}
return
-
1
;
}
int32_t
tqHandleAbort
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInTxn
!=
NULL
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
pNode
->
handle
.
valueInTxn
=
NULL
;
return
0
;
}
return
-
1
;
}
else
{
pNode
=
pNode
->
next
;
}
}
return
-
2
;
}
int32_t
tqHandleDel
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInUse
!=
NULL
)
{
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
pNode
->
handle
.
valueInUse
=
NULL
;
//if not in unpersist, put into unpersist
if
(
pNode
->
unpersistNext
==
NULL
)
{
pNode
->
unpersistNext
=
pMeta
->
unpersistHead
->
unpersistNext
;
pNode
->
unpersistPrev
=
pMeta
->
unpersistHead
;
pMeta
->
unpersistHead
->
unpersistNext
->
unpersistPrev
=
pNode
;
pMeta
->
unpersistHead
->
unpersistNext
=
pNode
;
}
return
0
;
}
return
-
1
;
}
else
{
pNode
=
pNode
->
next
;
}
}
return
-
2
;
}
int32_t
tqHandleClear
(
TqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_SIZE
;
TqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
bool
exist
=
false
;
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInUse
!=
NULL
)
{
exist
=
true
;
pMeta
->
deleter
(
pNode
->
handle
.
valueInUse
);
pNode
->
handle
.
valueInUse
=
NULL
;
}
if
(
pNode
->
handle
.
valueInTxn
!=
NULL
)
{
exist
=
true
;
pMeta
->
deleter
(
pNode
->
handle
.
valueInTxn
);
pNode
->
handle
.
valueInTxn
=
NULL
;
}
if
(
exist
)
{
if
(
pNode
->
unpersistNext
==
NULL
)
{
pNode
->
unpersistNext
=
pMeta
->
unpersistHead
->
unpersistNext
;
pNode
->
unpersistPrev
=
pMeta
->
unpersistHead
;
pMeta
->
unpersistHead
->
unpersistNext
->
unpersistPrev
=
pNode
;
pMeta
->
unpersistHead
->
unpersistNext
=
pNode
;
}
return
0
;
}
return
-
1
;
}
else
{
pNode
=
pNode
->
next
;
}
}
return
-
2
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录