Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a1fbaf30
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a1fbaf30
编写于
10月 13, 2021
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
tq data structure defined
上级
dd9ac9bb
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
98 addition
and
10 deletion
+98
-10
.gitignore
.gitignore
+3
-1
include/server/vnode/tq/tq.h
include/server/vnode/tq/tq.h
+20
-2
source/server/vnode/tq/inc/tqInt.h
source/server/vnode/tq/inc/tqInt.h
+21
-6
source/server/vnode/tq/src/tq.c
source/server/vnode/tq/src/tq.c
+54
-1
未找到文件。
.gitignore
浏览文件 @
a1fbaf30
build/
compile_commands.json
.cache
.ycm_extra_conf.py
.vscode/
.idea/
...
...
@@ -96,4 +98,4 @@ tramp
TAGS
deps/*
!deps/CMakeLists.txt
\ No newline at end of file
!deps/CMakeLists.txt
include/server/vnode/tq/tq.h
浏览文件 @
a1fbaf30
...
...
@@ -22,8 +22,26 @@
extern
"C"
{
#endif
typedef
struct
STQ
STQ
;
typedef
struct
tqTopicVhandle
{
//name
//
//executor for filter
//
//callback for mnode
//
}
tqTopic
;
typedef
struct
STQ
{
//the set for topics
//key=topicName: str
//value=tqTopicVhandle
//a map
//key=<topic: str, cgId: int64_t>
//value=consumeOffset: int64_t
}
STQ
;
//init in each vnode
STQ
*
tqInit
(
void
*
ref_func
(
void
*
),
void
*
unref_func
(
void
*
));
void
tqCleanUp
(
STQ
*
);
...
...
source/server/vnode/tq/inc/tqInt.h
浏览文件 @
a1fbaf30
...
...
@@ -18,23 +18,38 @@
#include "tq.h"
#define TQ_BUFFER_SIZE 8
#ifdef __cplusplus
extern
"C"
{
#endif
//implement the array index
//implement the ring buffer
typedef
struct
tqBufferItem
{
int64_t
offset
;
void
*
content
;
}
tqBufferItem
;
typedef
struct
tqGroupHandle
{
char
*
topic
;
void
*
ahandle
;
int64_t
cgId
;
int64_t
consumeOffset
;
int32_t
head
;
int32_t
tail
;
tqBufferItem
buffer
[
TQ_BUFFER_SIZE
];
}
tqGroupHandle
;
//create persistent storage for meta info such as consuming offset
//return value > 0: cgId
//return value <= 0: error code
int
tqCreate
Group
(
STQ
*
);
int
tqCreate
TCGroup
(
STQ
*
,
const
char
*
topic
,
int
cgId
,
tqGroupHandle
**
handle
);
//create ring buffer in memory and load consuming offset
int
tqOpen
Group
(
STQ
*
,
int
cgId
);
int
tqOpen
TCGroup
(
STQ
*
,
const
char
*
topic
,
int
cgId
);
//destroy ring buffer and persist consuming offset
int
tqClose
Group
(
STQ
*
,
int
cgId
);
int
tqClose
TCGroup
(
STQ
*
,
const
char
*
topic
,
int
cgId
);
//delete persistent storage for meta info
int
tqDrop
Group
(
STQ
*
,
int
cgId
);
int
tqDrop
TCGroup
(
STQ
*
,
const
char
*
topic
,
int
cgId
);
#ifdef __cplusplus
}
...
...
source/server/vnode/tq/src/tq.c
浏览文件 @
a1fbaf30
...
...
@@ -22,12 +22,65 @@
//
//handle management message
static
tqGroupHandle
*
tqLookupGroupHandle
(
STQ
*
pTq
,
const
char
*
topic
,
int
cgId
)
{
//look in memory
//
//not found, try to restore from disk
//
//still not found
return
NULL
;
}
static
int
tqCommitTCGroup
(
tqGroupHandle
*
handle
)
{
//persist into disk
return
0
;
}
int
tqCreateTCGroup
(
STQ
*
pTq
,
const
char
*
topic
,
int
cgId
,
tqGroupHandle
**
handle
)
{
return
0
;
}
int
tqOpenTGroup
(
STQ
*
pTq
,
const
char
*
topic
,
int
cgId
)
{
int
code
;
tqGroupHandle
*
handle
=
tqLookupGroupHandle
(
pTq
,
topic
,
cgId
);
if
(
handle
==
NULL
)
{
code
=
tqCreateTCGroup
(
pTq
,
topic
,
cgId
,
&
handle
);
if
(
code
!=
0
)
{
return
code
;
}
}
ASSERT
(
handle
!=
NULL
);
//put into STQ
return
0
;
}
int
tqCloseTCGroup
(
STQ
*
pTq
,
const
char
*
topic
,
int
cgId
)
{
tqGroupHandle
*
handle
=
tqLookupGroupHandle
(
pTq
,
topic
,
cgId
);
return
tqCommitTCGroup
(
handle
);
}
int
tqDropTCGroup
(
STQ
*
pTq
,
const
char
*
topic
,
int
cgId
)
{
//delete from disk
return
0
;
}
int
tqPushMsg
(
STQ
*
pTq
,
void
*
p
,
int64_t
version
)
{
//add reference
//
//
judge and launch new query
return
0
;
}
int
tqCommit
(
STQ
*
pTq
)
{
//do nothing
return
0
;
}
int
tqHandleMsg
(
STQ
*
pTq
,
void
*
msg
)
{
//parse msg and extract topic and cgId
//lookup handle
//confirm message and send to consumer
//judge and launch new query
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录