Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3639999e
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3639999e
编写于
10月 17, 2021
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into 3.0_refact
上级
ed3beb24
13853c5a
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
333 addition
and
7 deletion
+333
-7
include/common/taosmsg.h
include/common/taosmsg.h
+5
-1
include/server/vnode/tq/tq.h
include/server/vnode/tq/tq.h
+52
-2
include/util/theap.h
include/util/theap.h
+65
-0
source/server/vnode/tq/inc/tqInt.h
source/server/vnode/tq/inc/tqInt.h
+4
-3
source/server/vnode/tq/src/tq.c
source/server/vnode/tq/src/tq.c
+1
-1
source/util/src/theap.c
source/util/src/theap.c
+206
-0
未找到文件。
include/common/taosmsg.h
浏览文件 @
3639999e
...
...
@@ -41,6 +41,10 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" )
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_QUERY
,
"query"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_FETCH
,
"fetch"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_UPDATE_TAG_VAL
,
"update-tag-val"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_MQ_CONNECT
,
"mq-connect"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_MQ_CONSUME
,
"mq-consume"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_MQ_ACK
,
"mq-ack"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_MQ_RESET
,
"mq-reset"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY1
,
"dummy1"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY2
,
"dummy2"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY3
,
"dummy3"
)
...
...
@@ -113,7 +117,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
// message for topic
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CM_CREATE_TP
,
"create-tp"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CM_DROP_TP
,
"drop-tp"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CM_USE_TP
,
"use-tp"
)
//
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" )
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CM_ALTER_TP
,
"alter-tp"
)
#ifndef TAOS_MESSAGE_C
...
...
include/server/vnode/tq/tq.h
浏览文件 @
3639999e
...
...
@@ -22,6 +22,56 @@
extern
"C"
{
#endif
typedef
struct
tmqMsgHead
{
int32_t
headLen
;
int32_t
msgVer
;
int64_t
cgId
;
int32_t
topicLen
;
char
topic
[];
}
tmqMsgHead
;
//TODO: put msgs into common
typedef
struct
tmqConnectReq
{
tmqMsgHead
head
;
}
tmqConnectReq
;
typedef
struct
tmqConnectResp
{
}
tmqConnectResp
;
typedef
struct
tmqDisconnectReq
{
}
tmqDisconnectReq
;
typedef
struct
tmqDisconnectResp
{
}
tmqDiconnectResp
;
typedef
struct
tmqConsumeReq
{
}
tmqConsumeReq
;
typedef
struct
tmqConsumeResp
{
}
tmqConsumeResp
;
typedef
struct
tmqSubscribeReq
{
}
tmqSubscribeReq
;
typedef
struct
tmqSubscribeResp
{
}
tmqSubscribeResp
;
typedef
struct
tmqHeartbeatReq
{
}
tmqHeartbeatReq
;
typedef
struct
tmqHeartbeatResp
{
}
tmqHeartbeatResp
;
typedef
struct
tqTopicVhandle
{
//name
//
...
...
@@ -29,7 +79,7 @@ typedef struct tqTopicVhandle {
//
//callback for mnode
//
}
tqTopic
;
}
tqTopic
Vhandle
;
typedef
struct
STQ
{
//the set for topics
...
...
@@ -50,7 +100,7 @@ int tqPushMsg(STQ*, void* msg, int64_t version);
int
tqCommit
(
STQ
*
);
//void* will be replace by a msg type
int
tqHandle
Msg
(
STQ
*
,
void
*
msg
);
int
tqHandle
ConsumeMsg
(
STQ
*
,
tmqConsumeReq
*
msg
);
#ifdef __cplusplus
}
...
...
include/util/theap.h
0 → 100644
浏览文件 @
3639999e
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HEAP_H
#define TDENGINE_HEAP_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "os.h"
struct
HeapNode
;
/* Return non-zero if a < b. */
typedef
int
(
*
HeapCompareFn
)(
const
struct
HeapNode
*
a
,
const
struct
HeapNode
*
b
);
typedef
struct
HeapNode
{
struct
HeapNode
*
left
;
struct
HeapNode
*
right
;
struct
HeapNode
*
parent
;
}
HeapNode
;
/* A binary min heap. The usual properties hold: the root is the lowest
* element in the set, the height of the tree is at most log2(nodes) and
* it's always a complete binary tree.
*
*/
typedef
struct
{
HeapNode
*
min
;
size_t
nelts
;
HeapCompareFn
compFn
;
}
Heap
;
Heap
*
heapCreate
(
HeapCompareFn
fn
);
void
heapDestroy
(
Heap
*
heap
);
HeapNode
*
heapMin
(
const
Heap
*
heap
);
void
heapInsert
(
Heap
*
heap
,
HeapNode
*
node
);
void
heapRemove
(
Heap
*
heap
,
struct
HeapNode
*
node
);
void
heapDequeue
(
Heap
*
heap
);
size_t
heapSize
(
Heap
*
heap
);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_HASH_H
source/server/vnode/tq/inc/tqInt.h
浏览文件 @
3639999e
...
...
@@ -26,14 +26,15 @@ extern "C" {
typedef
struct
tqBufferItem
{
int64_t
offset
;
void
*
content
;
void
*
executor
;
void
*
content
;
}
tqBufferItem
;
typedef
struct
tqGroupHandle
{
char
*
topic
;
void
*
ahandle
;
char
*
topic
;
//c style, end with '\0'
int64_t
cgId
;
void
*
ahandle
;
int64_t
consumeOffset
;
int32_t
head
;
int32_t
tail
;
...
...
source/server/vnode/tq/src/tq.c
浏览文件 @
3639999e
...
...
@@ -77,7 +77,7 @@ int tqCommit(STQ* pTq) {
return
0
;
}
int
tqHandle
Msg
(
STQ
*
pTq
,
void
*
msg
)
{
int
tqHandle
ConsumeMsg
(
STQ
*
pTq
,
tmqConsumeReq
*
msg
)
{
//parse msg and extract topic and cgId
//lookup handle
//confirm message and send to consumer
...
...
source/util/src/theap.c
0 → 100644
浏览文件 @
3639999e
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "theap.h"
size_t
heapSize
(
Heap
*
heap
)
{
return
heap
->
nelts
;
}
Heap
*
heapCreate
(
HeapCompareFn
fn
)
{
Heap
*
heap
=
calloc
(
1
,
sizeof
(
Heap
));
if
(
heap
==
NULL
)
{
return
NULL
;
}
heap
->
min
=
NULL
;
heap
->
nelts
=
0
;
heap
->
compFn
=
fn
;
return
heap
;
}
void
heapDestroy
(
Heap
*
heap
)
{
free
(
heap
);
}
HeapNode
*
heapMin
(
const
Heap
*
heap
)
{
return
heap
->
min
;
}
/* Swap parent with child. Child moves closer to the root, parent moves away. */
static
void
heapNodeSwap
(
Heap
*
heap
,
HeapNode
*
parent
,
HeapNode
*
child
)
{
HeapNode
*
sibling
;
HeapNode
t
;
t
=
*
parent
;
*
parent
=
*
child
;
*
child
=
t
;
parent
->
parent
=
child
;
if
(
child
->
left
==
child
)
{
child
->
left
=
parent
;
sibling
=
child
->
right
;
}
else
{
child
->
right
=
parent
;
sibling
=
child
->
left
;
}
if
(
sibling
!=
NULL
)
sibling
->
parent
=
child
;
if
(
parent
->
left
!=
NULL
)
parent
->
left
->
parent
=
parent
;
if
(
parent
->
right
!=
NULL
)
parent
->
right
->
parent
=
parent
;
if
(
child
->
parent
==
NULL
)
heap
->
min
=
child
;
else
if
(
child
->
parent
->
left
==
parent
)
child
->
parent
->
left
=
child
;
else
child
->
parent
->
right
=
child
;
}
void
heapInsert
(
Heap
*
heap
,
HeapNode
*
newnode
)
{
HeapNode
**
parent
;
HeapNode
**
child
;
unsigned
int
path
;
unsigned
int
n
;
unsigned
int
k
;
newnode
->
left
=
NULL
;
newnode
->
right
=
NULL
;
newnode
->
parent
=
NULL
;
/* Calculate the path from the root to the insertion point. This is a min
* heap so we always insert at the left-most free node of the bottom row.
*/
path
=
0
;
for
(
k
=
0
,
n
=
1
+
heap
->
nelts
;
n
>=
2
;
k
+=
1
,
n
/=
2
)
path
=
(
path
<<
1
)
|
(
n
&
1
);
/* Now traverse the heap using the path we calculated in the previous step. */
parent
=
child
=
&
heap
->
min
;
while
(
k
>
0
)
{
parent
=
child
;
if
(
path
&
1
)
child
=
&
(
*
child
)
->
right
;
else
child
=
&
(
*
child
)
->
left
;
path
>>=
1
;
k
-=
1
;
}
/* Insert the new node. */
newnode
->
parent
=
*
parent
;
*
child
=
newnode
;
heap
->
nelts
+=
1
;
/* Walk up the tree and check at each node if the heap property holds.
* It's a min heap so parent < child must be true.
*/
while
(
newnode
->
parent
!=
NULL
&&
(
heap
->
compFn
)(
newnode
,
newnode
->
parent
))
heapNodeSwap
(
heap
,
newnode
->
parent
,
newnode
);
}
void
heapRemove
(
Heap
*
heap
,
HeapNode
*
node
)
{
HeapNode
*
smallest
;
HeapNode
**
max
;
HeapNode
*
child
;
unsigned
int
path
;
unsigned
int
k
;
unsigned
int
n
;
if
(
heap
->
nelts
==
0
)
return
;
/* Calculate the path from the min (the root) to the max, the left-most node
* of the bottom row.
*/
path
=
0
;
for
(
k
=
0
,
n
=
heap
->
nelts
;
n
>=
2
;
k
+=
1
,
n
/=
2
)
path
=
(
path
<<
1
)
|
(
n
&
1
);
/* Now traverse the heap using the path we calculated in the previous step. */
max
=
&
heap
->
min
;
while
(
k
>
0
)
{
if
(
path
&
1
)
max
=
&
(
*
max
)
->
right
;
else
max
=
&
(
*
max
)
->
left
;
path
>>=
1
;
k
-=
1
;
}
heap
->
nelts
-=
1
;
/* Unlink the max node. */
child
=
*
max
;
*
max
=
NULL
;
if
(
child
==
node
)
{
/* We're removing either the max or the last node in the tree. */
if
(
child
==
heap
->
min
)
{
heap
->
min
=
NULL
;
}
return
;
}
/* Replace the to be deleted node with the max node. */
child
->
left
=
node
->
left
;
child
->
right
=
node
->
right
;
child
->
parent
=
node
->
parent
;
if
(
child
->
left
!=
NULL
)
{
child
->
left
->
parent
=
child
;
}
if
(
child
->
right
!=
NULL
)
{
child
->
right
->
parent
=
child
;
}
if
(
node
->
parent
==
NULL
)
{
heap
->
min
=
child
;
}
else
if
(
node
->
parent
->
left
==
node
)
{
node
->
parent
->
left
=
child
;
}
else
{
node
->
parent
->
right
=
child
;
}
/* Walk down the subtree and check at each node if the heap property holds.
* It's a min heap so parent < child must be true. If the parent is bigger,
* swap it with the smallest child.
*/
for
(;;)
{
smallest
=
child
;
if
(
child
->
left
!=
NULL
&&
(
heap
->
compFn
)(
child
->
left
,
smallest
))
smallest
=
child
->
left
;
if
(
child
->
right
!=
NULL
&&
(
heap
->
compFn
)(
child
->
right
,
smallest
))
smallest
=
child
->
right
;
if
(
smallest
==
child
)
break
;
heapNodeSwap
(
heap
,
child
,
smallest
);
}
/* Walk up the subtree and check that each parent is less than the node
* this is required, because `max` node is not guaranteed to be the
* actual maximum in tree
*/
while
(
child
->
parent
!=
NULL
&&
(
heap
->
compFn
)(
child
,
child
->
parent
))
heapNodeSwap
(
heap
,
child
->
parent
,
child
);
}
void
heapDequeue
(
Heap
*
heap
)
{
heapRemove
(
heap
,
heap
->
min
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录