Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
dfda8098
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
dfda8098
编写于
8月 05, 2022
作者:
H
Hui Li
提交者:
GitHub
8月 05, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #15783 from taosdata/test3.0/lihui
test: modify tmq example code for doc
上级
9d4fc3ab
94f66806
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
528 addition
and
727 deletion
+528
-727
docs/zh/07-develop/07-tmq.md
docs/zh/07-develop/07-tmq.md
+241
-254
examples/c/tmq.c
examples/c/tmq.c
+287
-473
未找到文件。
docs/zh/07-develop/07-tmq.md
浏览文件 @
dfda8098
---
sidebar_label
:
数据订阅
description
:
"
轻量级的数据订阅与推送服务。连续写入到
TDengine
中的时序数据能够被自动推送到订阅客户端。"
title
:
数据订阅
---
import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
import Java from "./_sub_java.mdx";
import Python from "./_sub_python.mdx";
import Go from "./_sub_go.mdx";
import Rust from "./_sub_rust.mdx";
import Node from "./_sub_node.mdx";
import CSharp from "./_sub_cs.mdx";
import CDemo from "./_sub_c.mdx";
基于数据天然的时间序列特性,TDengine 的数据写入(insert)与消息系统的数据发布(pub)逻辑上一致,均可视为系统中插入一条带时间戳的新记录。同时,TDengine 在内部严格按照数据时间序列单调递增的方式保存数据。本质上来说,TDengine 中每一张表均可视为一个标准的消息队列。
TDengine 内嵌支持轻量级的消息订阅与推送服务。使用系统提供的 API,用户可使用普通查询语句订阅数据库中的一张或多张表。订阅的逻辑和操作状态的维护均是由客户端完成,客户端定时轮询服务器是否有新的记录到达,有新的记录到达就会将结果反馈到客户。
TDengine 的订阅与推送服务的状态是由客户端维持,TDengine 服务端并不维持。因此如果应用重启,从哪个时间点开始获取最新数据,由应用决定。
TDengine 的 API 中,与订阅相关的主要有以下三个:
```
c
taos_subscribe
taos_consume
taos_unsubscribe
```
这些 API 的文档请见
[
C/C++ Connector
](
/reference/connector/cpp
)
,下面仍以智能电表场景为例介绍一下它们的具体用法(超级表和子表结构请参考上一节“连续查询”),完整的示例代码可以在
[
这里
](
https://github.com/taosdata/TDengine/blob/master/examples/c/subscribe.c
)
找到。
如果我们希望当某个电表的电流超过一定限制(比如 10A)后能得到通知并进行一些处理, 有两种方法:一是分别对每张子表进行查询,每次查询后记录最后一条数据的时间戳,后续只查询这个时间戳之后的数据:
```
sql
select
*
from
D1001
where
ts
>
{
last_timestamp1
}
and
current
>
10
;
select
*
from
D1002
where
ts
>
{
last_timestamp2
}
and
current
>
10
;
...
```
这确实可行,但随着电表数量的增加,查询数量也会增加,客户端和服务端的性能都会受到影响,当电表数增长到一定的程度,系统就无法承受了。
另一种方法是对超级表进行查询。这样,无论有多少电表,都只需一次查询:
```
sql
select
*
from
meters
where
ts
>
{
last_timestamp
}
and
current
>
10
;
```
但是,如何选择
`last_timestamp`
就成了一个新的问题。因为,一方面数据的产生时间(也就是数据时间戳)和数据入库的时间一般并不相同,有时偏差还很大;另一方面,不同电表的数据到达 TDengine 的时间也会有差异。所以,如果我们在查询中使用最慢的那台电表的数据的时间戳作为
`last_timestamp`
,就可能重复读入其它电表的数据;如果使用最快的电表的时间戳,其它电表的数据就可能被漏掉。
TDengine 的订阅功能为上面这个问题提供了一个彻底的解决方案。
首先是使用
`taos_subscribe`
创建订阅:
```
c
TAOS_SUB
*
tsub
=
NULL
;
if
(
async
)
{
// create an asynchronized subscription, the callback function will be called every 1s
tsub
=
taos_subscribe
(
taos
,
restart
,
topic
,
sql
,
subscribe_callback
,
&
blockFetch
,
1000
);
}
else
{
// create an synchronized subscription, need to call 'taos_consume' manually
tsub
=
taos_subscribe
(
taos
,
restart
,
topic
,
sql
,
NULL
,
NULL
,
0
);
}
```
TDengine 中的订阅既可以是同步的,也可以是异步的,上面的代码会根据从命令行获取的参数
`async`
的值来决定使用哪种方式。这里,同步的意思是用户程序要直接调用
`taos_consume`
来拉取数据,而异步则由 API 在内部的另一个线程中调用
`taos_consume`
,然后把拉取到的数据交给回调函数
`subscribe_callback`
去处理。(注意,
`subscribe_callback`
中不宜做较为耗时的操作,否则有可能导致客户端阻塞等不可控的问题。)
参数
`taos`
是一个已经建立好的数据库连接,在同步模式下无特殊要求。但在异步模式下,需要注意它不会被其它线程使用,否则可能导致不可预计的错误,因为回调函数在 API 的内部线程中被调用,而 TDengine 的部分 API 不是线程安全的。
参数
`sql`
是查询语句,可以在其中使用 where 子句指定过滤条件。在我们的例子中,如果只想订阅电流超过 10A 时的数据,可以这样写:
```
sql
select
*
from
meters
where
current
>
10
;
```
注意,这里没有指定起始时间,所以会读到所有时间的数据。如果只想从一天前的数据开始订阅,而不需要更早的历史数据,可以再加上一个时间条件:
```
sql
select
*
from
meters
where
ts
>
now
-
1
d
and
current
>
10
;
```
订阅的
`topic`
实际上是它的名字,因为订阅功能是在客户端 API 中实现的,所以没必要保证它全局唯一,但需要它在一台客户端机器上唯一。
如果名为
`topic`
的订阅不存在,参数
`restart`
没有意义;但如果用户程序创建这个订阅后退出,当它再次启动并重新使用这个
`topic`
时,
`restart`
就会被用于决定是从头开始读取数据,还是接续上次的位置进行读取。本例中,如果
`restart`
是
**true**
(非零值),用户程序肯定会读到所有数据。但如果这个订阅之前就存在了,并且已经读取了一部分数据,且
`restart`
是
**false**
(
**0**
),用户程序就不会读到之前已经读取的数据了。
`taos_subscribe`
的最后一个参数是以毫秒为单位的轮询周期。在同步模式下,如果前后两次调用
`taos_consume`
的时间间隔小于此时间,
`taos_consume`
会阻塞,直到间隔超过此时间。异步模式下,这个时间是两次调用回调函数的最小时间间隔。
`taos_subscribe`
的倒数第二个参数用于用户程序向回调函数传递附加参数,订阅 API 不对其做任何处理,只原样传递给回调函数。此参数在同步模式下无意义。
订阅创建以后,就可以消费其数据了,同步模式下,示例代码是下面的 else 部分:
```
c
if
(
async
)
{
getchar
();
}
else
while
(
1
)
{
TAOS_RES
*
res
=
taos_consume
(
tsub
);
if
(
res
==
NULL
)
{
printf
(
"failed to consume data."
);
break
;
}
else
{
print_result
(
res
,
blockFetch
);
getchar
();
}
}
```
这里是一个
**while**
循环,用户每按一次回车键就调用一次
`taos_consume`
,而
`taos_consume`
的返回值是查询到的结果集,与
`taos_use_result`
完全相同,例子中使用这个结果集的代码是函数
`print_result`
:
```
c
void
print_result
(
TAOS_RES
*
res
,
int
blockFetch
)
{
TAOS_ROW
row
=
NULL
;
int
num_fields
=
taos_num_fields
(
res
);
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
res
);
int
nRows
=
0
;
if
(
blockFetch
)
{
nRows
=
taos_fetch_block
(
res
,
&
row
);
for
(
int
i
=
0
;
i
<
nRows
;
i
++
)
{
char
temp
[
256
];
taos_print_row
(
temp
,
row
+
i
,
fields
,
num_fields
);
puts
(
temp
);
}
}
else
{
while
((
row
=
taos_fetch_row
(
res
)))
{
char
temp
[
256
];
taos_print_row
(
temp
,
row
,
fields
,
num_fields
);
puts
(
temp
);
nRows
++
;
}
}
printf
(
"%d rows consumed.
\n
"
,
nRows
);
}
```
其中的
`taos_print_row`
用于处理订阅到数据,在我们的例子中,它会打印出所有符合条件的记录。而异步模式下,消费订阅到的数据则显得更为简单:
```
c
void
subscribe_callback
(
TAOS_SUB
*
tsub
,
TAOS_RES
*
res
,
void
*
param
,
int
code
)
{
print_result
(
res
,
*
(
int
*
)
param
);
}
```
当要结束一次数据订阅时,需要调用
`taos_unsubscribe`
:
```
c
taos_unsubscribe
(
tsub
,
keep
);
```
其第二个参数,用于决定是否在客户端保留订阅的进度信息。如果这个参数是
**false**
(
**0**
),那无论下次调用
`taos_subscribe`
时的
`restart`
参数是什么,订阅都只能重新开始。另外,进度信息的保存位置是 _{DataDir}/subscribe/_ 这个目录下(注:
`taos.cfg`
配置文件中
`DataDir`
参数值默认为
**/var/lib/taos/**
,但是 Windows 服务器上本身不存在该目录,所以需要在 Windows 的配置文件中修改
`DataDir`
参数值为相应的已存在目录"),每个订阅有一个与其
`topic`
同名的文件,删掉某个文件,同样会导致下次创建其对应的订阅时只能重新开始。
代码介绍完毕,我们来看一下实际的运行效果。假设:
-
示例代码已经下载到本地
-
TDengine 也已经在同一台机器上安装好
-
示例所需的数据库、超级表、子表已经全部创建好
则可以在示例代码所在目录执行以下命令来编译并启动示例程序:
```
bash
make
./subscribe
-sql
=
'select * from meters where current > 10;'
```
示例程序启动后,打开另一个终端窗口,启动 TDengine CLI 向
**D1001**
插入一条电流为 12A 的数据:
```
sql
$
taos
>
use
test
;
>
insert
into
D1001
values
(
now
,
12
,
220
,
1
);
```
这时,因为电流超过了 10A,您应该可以看到示例程序将它输出到了屏幕上。您可以继续插入一些数据观察示例程序的输出。
## 示例程序
下面的示例程序展示是如何使用连接器订阅所有电流超过 10A 的记录。
### 准备数据
```
# create database "power"
taos> create database power;
# use "power" as the database in following operations
taos> use power;
# create super table "meters"
taos> create table meters(ts timestamp, current float, voltage int, phase int) tags(location binary(64), groupId int);
# create tabes using the schema defined by super table "meters"
taos> create table d1001 using meters tags ("California.SanFrancisco", 2);
taos> create table d1002 using meters tags ("California.LosAngeles", 2);
# insert some rows
taos> insert into d1001 values("2020-08-15 12:00:00.000", 12, 220, 1),("2020-08-15 12:10:00.000", 12.3, 220, 2),("2020-08-15 12:20:00.000", 12.2, 220, 1);
taos> insert into d1002 values("2020-08-15 12:00:00.000", 9.9, 220, 1),("2020-08-15 12:10:00.000", 10.3, 220, 1),("2020-08-15 12:20:00.000", 11.2, 220, 1);
# filter out the rows in which current is bigger than 10A
taos> select * from meters where current > 10;
ts | current | voltage | phase | location | groupid |
===========================================================================================================
2020-08-15 12:10:00.000 | 10.30000 | 220 | 1 | California.LosAngeles | 2 |
2020-08-15 12:20:00.000 | 11.20000 | 220 | 1 | California.LosAngeles | 2 |
2020-08-15 12:00:00.000 | 12.00000 | 220 | 1 | California.SanFrancisco | 2 |
2020-08-15 12:10:00.000 | 12.30000 | 220 | 2 | California.SanFrancisco | 2 |
2020-08-15 12:20:00.000 | 12.20000 | 220 | 1 | California.SanFrancisco | 2 |
Query OK, 5 row(s) in set (0.004896s)
```
### 示例代码
<Tabs
defaultValue=
"java"
groupId=
"lang"
>
<TabItem
label=
"Java"
value=
"java"
>
<Java
/>
</TabItem>
<TabItem
label=
"Python"
value=
"Python"
>
<Python
/>
</TabItem>
{/
*
<TabItem
label=
"Go"
value=
"go"
>
<Go/>
</TabItem>
*
/}
<TabItem
label=
"Rust"
value=
"rust"
>
<Rust
/>
</TabItem>
{/
*
<TabItem
label=
"Node.js"
value=
"nodejs"
>
<Node/>
</TabItem>
<TabItem
label=
"C#"
value=
"csharp"
>
<CSharp/>
</TabItem>
*
/}
<TabItem
label=
"C"
value=
"c"
>
<CDemo
/>
</TabItem>
</Tabs>
### 运行示例程序
示例程序会先消费符合查询条件的所有历史数据:
```
bash
ts: 1597464000000 current: 12.0 voltage: 220 phase: 1 location: California.SanFrancisco groupid : 2
ts: 1597464600000 current: 12.3 voltage: 220 phase: 2 location: California.SanFrancisco groupid : 2
ts: 1597465200000 current: 12.2 voltage: 220 phase: 1 location: California.SanFrancisco groupid : 2
ts: 1597464600000 current: 10.3 voltage: 220 phase: 1 location: California.LosAngeles groupid : 2
ts: 1597465200000 current: 11.2 voltage: 220 phase: 1 location: California.LosAngeles groupid : 2
```
接着,使用 TDengine CLI 向表中新增一条数据:
```
# taos
taos> use power;
taos> insert into d1001 values(now, 12.4, 220, 1);
```
因为这条数据的电流大于 10A,示例程序会将其消费:
```
ts: 1651146662805 current: 12.4 voltage: 220 phase: 1 location: California.SanFrancisco groupid: 2
```
---
sidebar_label
:
消息队列
description
:
"
数据订阅与推送服务。连续写入到
TDengine
中的时序数据能够被自动推送到订阅客户端。"
title
:
消息队列
---
基于数据天然的时间序列特性,TDengine 的数据写入(insert)与消息系统的数据发布(pub)逻辑上一致,均可视为系统中插入一条带时间戳的新记录。同时,TDengine 在内部严格按照数据时间序列单调递增的方式保存数据。本质上来说,TDengine 中每一张表均可视为一个标准的消息队列。
TDengine 内嵌支持消息订阅与推送服务(下文都简称TMQ)。使用系统提供的 API,用户可使用普通查询语句订阅数据库中的一张或多张表,或整个库。客户端启动订阅后,定时或按需轮询服务器是否有新的记录到达,有新的记录到达就会将结果反馈到客户。
TMQ提供了提交机制来保证消息队列的可靠性和正确性。在调用方法上,支持自动提交和手动提交。
TMQ 的 API 中,与订阅相关的主要数据结构和API如下:
```
c
typedef
struct
tmq_t
tmq_t
;
typedef
struct
tmq_conf_t
tmq_conf_t
;
typedef
struct
tmq_list_t
tmq_list_t
;
typedef
void
(
tmq_commit_cb
(
tmq_t
*
,
int32_t
code
,
void
*
param
));
DLL_EXPORT
tmq_list_t
*
tmq_list_new
();
DLL_EXPORT
int32_t
tmq_list_append
(
tmq_list_t
*
,
const
char
*
);
DLL_EXPORT
void
tmq_list_destroy
(
tmq_list_t
*
);
DLL_EXPORT
tmq_t
*
tmq_consumer_new
(
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
);
DLL_EXPORT
const
char
*
tmq_err2str
(
int32_t
code
);
DLL_EXPORT
int32_t
tmq_subscribe
(
tmq_t
*
tmq
,
const
tmq_list_t
*
topic_list
);
DLL_EXPORT
int32_t
tmq_unsubscribe
(
tmq_t
*
tmq
);
DLL_EXPORT
TAOS_RES
*
tmq_consumer_poll
(
tmq_t
*
tmq
,
int64_t
timeout
);
DLL_EXPORT
int32_t
tmq_consumer_close
(
tmq_t
*
tmq
);
DLL_EXPORT
int32_t
tmq_commit_sync
(
tmq_t
*
tmq
,
const
TAOS_RES
*
msg
);
DLL_EXPORT
void
tmq_commit_async
(
tmq_t
*
tmq
,
const
TAOS_RES
*
msg
,
tmq_commit_cb
*
cb
,
void
*
param
);
enum
tmq_conf_res_t
{
TMQ_CONF_UNKNOWN
=
-
2
,
TMQ_CONF_INVALID
=
-
1
,
TMQ_CONF_OK
=
0
,
};
typedef
enum
tmq_conf_res_t
tmq_conf_res_t
;
DLL_EXPORT
tmq_conf_t
*
tmq_conf_new
();
DLL_EXPORT
tmq_conf_res_t
tmq_conf_set
(
tmq_conf_t
*
conf
,
const
char
*
key
,
const
char
*
value
);
DLL_EXPORT
void
tmq_conf_destroy
(
tmq_conf_t
*
conf
);
DLL_EXPORT
void
tmq_conf_set_auto_commit_cb
(
tmq_conf_t
*
conf
,
tmq_commit_cb
*
cb
,
void
*
param
);
```
这些 API 的文档请见
[
C/C++ Connector
](
/reference/connector/cpp
)
,下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码可以在
[
tmq.c
](
https://github.com/taosdata/TDengine/blob/3.0/examples/c/tmq.c
)
看到。
一、首先完成建库、建一张超级表和多张子表,并每个子表插入若干条数据记录:
```
sql
drop
database
if
exists
tmqdb
;
create
database
tmqdb
;
create
table
tmqdb
.
stb
(
ts
timestamp
,
c1
int
,
c2
float
,
c3
varchar
(
16
)
tags
(
t1
int
,
t3
varchar
(
16
));
create
table
tmqdb
.
ctb0
using
tmqdb
.
stb
tags
(
0
,
"subtable0"
);
create
table
tmqdb
.
ctb1
using
tmqdb
.
stb
tags
(
1
,
"subtable1"
);
create
table
tmqdb
.
ctb2
using
tmqdb
.
stb
tags
(
2
,
"subtable2"
);
create
table
tmqdb
.
ctb3
using
tmqdb
.
stb
tags
(
3
,
"subtable3"
);
insert
into
tmqdb
.
ctb0
values
(
now
,
0
,
0
,
'a0'
)(
now
+
1
s
,
0
,
0
,
'a00'
);
insert
into
tmqdb
.
ctb1
values
(
now
,
1
,
1
,
'a1'
)(
now
+
1
s
,
11
,
11
,
'a11'
);
insert
into
tmqdb
.
ctb2
values
(
now
,
2
,
2
,
'a1'
)(
now
+
1
s
,
22
,
22
,
'a22'
);
insert
into
tmqdb
.
ctb3
values
(
now
,
3
,
3
,
'a1'
)(
now
+
1
s
,
33
,
33
,
'a33'
);
```
二、创建topic:
```
sql
create
topic
topicName
as
select
ts
,
c1
,
c2
,
c3
from
tmqdb
.
stb
where
c1
>
1
;
```
注:TMQ支持多种订阅类型:
1、列订阅
语法:CREATE TOPIC topic_name as subquery
通过select语句订阅(包括select
*
,或select ts, c1等指定列描述订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)
-
TOPIC一旦创建则schema确定
-
被订阅或用于计算的column和tag不可被删除、修改
-
若发生schema变更,新增的column不出现在结果中
2、超级表订阅
语法:CREATE TOPIC topic_name AS STABLE stbName
-
订阅某超级表的全部数据,schema变更不受限,schema变更后写入的数据将以最新schema返回
-
在tmq的返回消息中schema是块级别的,每块的schema可能不一样
-
列变更后写入的数据若未落盘,将以写入时的schema返回
-
列变更后写入的数据若已落盘,将以落盘时的schema返回
3、db订阅
语法:CREATE TOPIC topic_name AS DATABASE db_name
-
订阅某一db的全部数据,schema变更不受限
-
在tmq的返回消息中schema是块级别的,每块的schema可能不一样
-
列变更后写入的数据若未落盘,将以写入时的schema返回
-
列变更后写入的数据若已落盘,将以落盘时的schema返回
三、创建consumer
目前支持的config:
| 参数名称 | 参数值 | 备注 |
| ---------------------------- | ------------------------------ | ------------------------------------------------------ |
| group.id | 最大长度:192 | |
| enable.auto.commit | 合法值:true, false | |
| auto.commit.interval.ms | | |
| auto.offset.reset | 合法值:earliest, latest, none | |
| td.connect.ip | 用于连接,同taos_connect的参数 | |
| td.connect.user | 用于连接,同taos_connect的参数 | |
| td.connect.pass | 用于连接,同taos_connect的参数 | |
| td.connect.port | 用于连接,同taos_connect的参数 | |
| enable.heartbeat.background | 合法值:true, false | 开启后台心跳,即consumer不会因为长时间不poll而认为离线 |
| experimental.snapshot.enable | 合法值:true, false | 从wal开始消费,还是从tsbs开始消费 |
| msg.with.table.name | 合法值:true, false | 从消息中能否解析表名 |
```
sql
/* 根据需要,设置消费组(group.id)、自动提交(enable.auto.commit)、自动提交时间间隔(auto.commit.interval.ms)、用户名(td.connect.user)、密码(td.connect.pass)等参数 */
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"true"
);
tmq_conf_set
(
conf
,
"auto.commit.interval.ms"
,
"1000"
);
tmq_conf_set
(
conf
,
"group.id"
,
"cgrpName"
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"auto.offset.reset"
,
"earliest"
);
tmq_conf_set
(
conf
,
"experimental.snapshot.enable"
,
"true"
);
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"true"
);
tmq_conf_set_auto_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
tmq_conf_destroy
(
conf
);
return
tmq
;
```
四、创建订阅主题列表
```
sql
tmq_list_t
*
topicList
=
tmq_list_new
();
tmq_list_append
(
topicList
,
"topicName"
);
return
topicList
;
```
单个consumer支持同时订阅多个topic。
五、启动订阅并开始消费
```
sql
/* 启动订阅 */
tmq_subscribe
(
tmq
,
topicList
);
tmq_list_destroy
(
topicList
);
/* 循环poll消息 */
int32_t
totalRows
=
0
;
int32_t
msgCnt
=
0
;
int32_t
consumeDelay
=
5000
;
while
(
running
)
{
TAOS_RES
*
tmqmsg
=
tmq_consumer_poll
(
tmq
,
consumeDelay
);
if
(
tmqmsg
)
{
msgCnt
++
;
totalRows
+=
msg_process
(
tmqmsg
);
taos_free_result
(
tmqmsg
);
}
else
{
break
;
}
}
fprintf
(
stderr
,
"%d msg consumed, include %d rows
\n
"
,
msgCnt
,
totalRows
);
```
这里是一个
**while**
循环,每调用一次tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析API完成消息内容的解析:
```
sql
static
int32_t
msg_process
(
TAOS_RES
*
msg
)
{
char
buf
[
1024
];
int32_t
rows
=
0
;
const
char
*
topicName
=
tmq_get_topic_name
(
msg
);
const
char
*
dbName
=
tmq_get_db_name
(
msg
);
int32_t
vgroupId
=
tmq_get_vgroup_id
(
msg
);
printf
(
"topic: %s
\n
"
,
topicName
);
printf
(
"db: %s
\n
"
,
dbName
);
printf
(
"vgroup id: %d
\n
"
,
vgroupId
);
while
(
1
)
{
TAOS_ROW
row
=
taos_fetch_row
(
msg
);
if
(
row
==
NULL
)
break
;
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
msg
);
int32_t
numOfFields
=
taos_field_count
(
msg
);
int32_t
*
length
=
taos_fetch_lengths
(
msg
);
int32_t
precision
=
taos_result_precision
(
msg
);
const
char
*
tbName
=
tmq_get_table_name
(
msg
);
rows
++
;
taos_print_row
(
buf
,
row
,
fields
,
numOfFields
);
printf
(
"row content from %s: %s
\n
"
,
(
tbName
!=
NULL
?
tbName
:
"null table"
),
buf
);
}
return
rows
;
}
```
五、结束消费
```
sql
/* 取消订阅 */
tmq_unsubscribe
(
tmq
);
/* 关闭消费 */
tmq_consumer_close
(
tmq
);
```
六、删除topic
如果不再需要,可以删除创建topic,但注意:只有没有被订阅的topic才能别删除。
```
sql
/* 删除topic */
drop
topic
topicName
;
```
七、状态查看
1、topics:查询已经创建的topic
```
sql
show
topics
;
```
2、consumers:查询consumer的状态及其订阅的topic
```
sql
show
consumers
;
```
3、subscriptions:查询consumer与vgroup之间的分配关系
```
sql
show
subscriptions
;
```
examples/c/tmq.c
浏览文件 @
dfda8098
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
static
int
running
=
1
;
static
void
msg_process
(
TAOS_RES
*
msg
)
{
char
buf
[
1024
];
/*memset(buf, 0, 1024);*/
printf
(
"topic: %s
\n
"
,
tmq_get_topic_name
(
msg
));
printf
(
"db: %s
\n
"
,
tmq_get_db_name
(
msg
));
printf
(
"vg: %d
\n
"
,
tmq_get_vgroup_id
(
msg
));
if
(
tmq_get_res_type
(
msg
)
==
TMQ_RES_TABLE_META
)
{
tmq_raw_data
raw
=
{
0
};
int32_t
code
=
tmq_get_raw
(
msg
,
&
raw
);
if
(
code
==
0
)
{
TAOS
*
pConn
=
taos_connect
(
"192.168.1.86"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
;
}
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"create database if not exists abc1 vgroups 5"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
;
}
taos_free_result
(
pRes
);
int32_t
ret
=
tmq_write_raw
(
pConn
,
raw
);
printf
(
"write raw data: %s
\n
"
,
tmq_err2str
(
ret
));
taos_close
(
pConn
);
}
char
*
result
=
tmq_get_json_meta
(
msg
);
if
(
result
)
{
printf
(
"meta result: %s
\n
"
,
result
);
}
tmq_free_json_meta
(
result
);
return
;
}
while
(
1
)
{
TAOS_ROW
row
=
taos_fetch_row
(
msg
);
if
(
row
==
NULL
)
break
;
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
msg
);
int32_t
numOfFields
=
taos_field_count
(
msg
);
taos_print_row
(
buf
,
row
,
fields
,
numOfFields
);
printf
(
"%s
\n
"
,
buf
);
const
char
*
tbName
=
tmq_get_table_name
(
msg
);
if
(
tbName
)
{
printf
(
"from tb: %s
\n
"
,
tbName
);
}
}
}
int32_t
init_env
()
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
}
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"create database if not exists abc1 vgroups 5"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct0 using st1 tags(1000,
\"
ttt
\"
, true)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table tu1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct0 values(now, 1, 2, 'a')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct0, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct1 using st1(t1) tags(2000)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct2 using st1(t1) tags(NULL)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct2, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct1 values(now, 3, 4, 'b')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct3 using st1(t1) tags(3000)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 values(now, 5, 6, 'c')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
#if 0
pRes = taos_query(pConn, "alter table st1 add column c4 bigint");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 add tag t2 binary(64)");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table ct3 set tag t1=5000");
if (taos_errno(pRes) != 0) {
printf("failed to slter child table ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table ct3 ct1");
if (taos_errno(pRes) != 0) {
printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table st1");
if (taos_errno(pRes) != 0) {
printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))");
if (taos_errno(pRes) != 0) {
printf("failed to create normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 add column c3 bigint");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 modify column c2 nchar(8)");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 rename column c3 cc3");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 comment 'hello'");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 drop column c1");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table n1");
if (taos_errno(pRes) != 0) {
printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table jt1 using jt tags('{\"k1\":1, \"k2\":\"hello\"}')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table jt2 using jt tags('')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table st1");
if (taos_errno(pRes) != 0) {
printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
#endif
return
0
;
}
int32_t
create_topic
()
{
printf
(
"create topic
\n
"
);
TAOS_RES
*
pRes
;
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
}
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
// pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column as select ts, c1, c2, c3 from st1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic topic_ctb_column, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create topic topic2 as select ts, c1, c2, c3 from st1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic topic_ctb_column, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
#if 0
pRes = taos_query(pConn, "insert into tu1 values(now, 1, 1.0, 'bi1')");
if (taos_errno(pRes) != 0) {
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tu1 values(now+1d, 1, 1.0, 'bi1')");
if (taos_errno(pRes) != 0) {
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tu2 values(now, 2, 2.0, 'bi2')");
if (taos_errno(pRes) != 0) {
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tu2 values(now+1d, 2, 2.0, 'bi2')");
if (taos_errno(pRes) != 0) {
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
#endif
taos_close
(
pConn
);
return
0
;
}
void
tmq_commit_cb_print
(
tmq_t
*
tmq
,
int32_t
code
,
void
*
param
)
{
printf
(
"commit %d tmq %p param %p
\n
"
,
code
,
tmq
,
param
);
}
tmq_t
*
build_consumer
()
{
#if 0
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
#endif
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_set
(
conf
,
"group.id"
,
"tg2"
);
tmq_conf_set
(
conf
,
"client.id"
,
"my app 1"
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"true"
);
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"true"
);
/*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/
tmq_conf_set_auto_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
assert
(
tmq
);
tmq_conf_destroy
(
conf
);
return
tmq
;
}
tmq_list_t
*
build_topic_list
()
{
tmq_list_t
*
topic_list
=
tmq_list_new
();
tmq_list_append
(
topic_list
,
"topic_ctb_column"
);
/*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/
return
topic_list
;
}
void
basic_consume_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
int32_t
code
;
if
((
code
=
tmq_subscribe
(
tmq
,
topics
)))
{
fprintf
(
stderr
,
"%% Failed to start consuming topics: %s
\n
"
,
tmq_err2str
(
code
));
printf
(
"subscribe err
\n
"
);
return
;
}
int32_t
cnt
=
0
;
while
(
running
)
{
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
-
1
);
if
(
tmqmessage
)
{
cnt
++
;
msg_process
(
tmqmessage
);
/*if (cnt >= 2) break;*/
/*printf("get data\n");*/
taos_free_result
(
tmqmessage
);
/*} else {*/
/*break;*/
/*tmq_commit_sync(tmq, NULL);*/
}
}
code
=
tmq_consumer_close
(
tmq
);
if
(
code
)
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
code
));
else
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
void
sync_consume_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
static
const
int
MIN_COMMIT_COUNT
=
1
;
int
msg_count
=
0
;
int32_t
code
;
if
((
code
=
tmq_subscribe
(
tmq
,
topics
)))
{
fprintf
(
stderr
,
"%% Failed to start consuming topics: %s
\n
"
,
tmq_err2str
(
code
));
return
;
}
tmq_list_t
*
subList
=
NULL
;
tmq_subscription
(
tmq
,
&
subList
);
char
**
subTopics
=
tmq_list_to_c_array
(
subList
);
int32_t
sz
=
tmq_list_get_size
(
subList
);
printf
(
"subscribed topics: "
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
printf
(
"%s, "
,
subTopics
[
i
]);
}
printf
(
"
\n
"
);
tmq_list_destroy
(
subList
);
while
(
running
)
{
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
1000
);
if
(
tmqmessage
)
{
msg_process
(
tmqmessage
);
taos_free_result
(
tmqmessage
);
/*tmq_commit_sync(tmq, NULL);*/
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
}
}
code
=
tmq_consumer_close
(
tmq
);
if
(
code
)
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
code
));
else
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
int
main
(
int
argc
,
char
*
argv
[])
{
if
(
argc
>
1
)
{
printf
(
"env init
\n
"
);
if
(
init_env
()
<
0
)
{
return
-
1
;
}
create_topic
();
}
tmq_t
*
tmq
=
build_consumer
();
tmq_list_t
*
topic_list
=
build_topic_list
();
basic_consume_loop
(
tmq
,
topic_list
);
/*sync_consume_loop(tmq, topic_list);*/
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
static
int
running
=
1
;
static
char
dbName
[
64
]
=
"tmqdb"
;
static
char
stbName
[
64
]
=
"stb"
;
static
char
topicName
[
64
]
=
"topicname"
;
static
int32_t
msg_process
(
TAOS_RES
*
msg
)
{
char
buf
[
1024
];
int32_t
rows
=
0
;
const
char
*
topicName
=
tmq_get_topic_name
(
msg
);
const
char
*
dbName
=
tmq_get_db_name
(
msg
);
int32_t
vgroupId
=
tmq_get_vgroup_id
(
msg
);
printf
(
"topic: %s
\n
"
,
topicName
);
printf
(
"db: %s
\n
"
,
dbName
);
printf
(
"vgroup id: %d
\n
"
,
vgroupId
);
while
(
1
)
{
TAOS_ROW
row
=
taos_fetch_row
(
msg
);
if
(
row
==
NULL
)
break
;
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
msg
);
int32_t
numOfFields
=
taos_field_count
(
msg
);
int32_t
*
length
=
taos_fetch_lengths
(
msg
);
int32_t
precision
=
taos_result_precision
(
msg
);
const
char
*
tbName
=
tmq_get_table_name
(
msg
);
rows
++
;
taos_print_row
(
buf
,
row
,
fields
,
numOfFields
);
printf
(
"row content from %s: %s
\n
"
,
(
tbName
!=
NULL
?
tbName
:
"null table"
),
buf
);
}
return
rows
;
}
static
int32_t
init_env
()
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
}
TAOS_RES
*
pRes
;
// drop database if exists
printf
(
"create database
\n
"
);
pRes
=
taos_query
(
pConn
,
"drop database if exists tmqdb"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in drop tmqdb, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
// create database
pRes
=
taos_query
(
pConn
,
"create database tmqdb"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create tmqdb, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
// create super table
printf
(
"create super table
\n
"
);
pRes
=
taos_query
(
pConn
,
"create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table stb, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
// create sub tables
printf
(
"create sub tables
\n
"
);
pRes
=
taos_query
(
pConn
,
"create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table ctb0, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table ctb1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table ctb2, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table ctb3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
// insert data
printf
(
"insert data into sub tables
\n
"
);
pRes
=
taos_query
(
pConn
,
"insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ctb0, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ctb0, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ctb0, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ctb0, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
return
0
;
}
int32_t
create_topic
()
{
printf
(
"create topic
\n
"
);
TAOS_RES
*
pRes
;
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
}
pRes
=
taos_query
(
pConn
,
"use tmqdb"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use tmqdb, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
// pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
pRes
=
taos_query
(
pConn
,
"create topic topicname as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic topicname, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
return
0
;
}
void
tmq_commit_cb_print
(
tmq_t
*
tmq
,
int32_t
code
,
void
*
param
)
{
printf
(
"tmq_commit_cb_print() code: %d, tmq: %p, param: %p
\n
"
,
code
,
tmq
,
param
);
}
tmq_t
*
build_consumer
()
{
tmq_conf_res_t
code
;
tmq_conf_t
*
conf
=
tmq_conf_new
();
code
=
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"true"
);
if
(
TMQ_CONF_OK
!=
code
)
return
NULL
;
code
=
tmq_conf_set
(
conf
,
"auto.commit.interval.ms"
,
"1000"
);
if
(
TMQ_CONF_OK
!=
code
)
return
NULL
;
code
=
tmq_conf_set
(
conf
,
"group.id"
,
"cgrpName"
);
if
(
TMQ_CONF_OK
!=
code
)
return
NULL
;
code
=
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
if
(
TMQ_CONF_OK
!=
code
)
return
NULL
;
code
=
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
if
(
TMQ_CONF_OK
!=
code
)
return
NULL
;
code
=
tmq_conf_set
(
conf
,
"auto.offset.reset"
,
"earliest"
);
if
(
TMQ_CONF_OK
!=
code
)
return
NULL
;
code
=
tmq_conf_set
(
conf
,
"experimental.snapshot.enable"
,
"true"
);
if
(
TMQ_CONF_OK
!=
code
)
return
NULL
;
code
=
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"true"
);
if
(
TMQ_CONF_OK
!=
code
)
return
NULL
;
tmq_conf_set_auto_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
tmq_conf_destroy
(
conf
);
return
tmq
;
}
tmq_list_t
*
build_topic_list
()
{
tmq_list_t
*
topicList
=
tmq_list_new
();
int32_t
code
=
tmq_list_append
(
topicList
,
"topicname"
);
if
(
code
)
{
return
NULL
;
}
return
topicList
;
}
void
basic_consume_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topicList
)
{
int32_t
code
;
if
((
code
=
tmq_subscribe
(
tmq
,
topicList
)))
{
fprintf
(
stderr
,
"%% Failed to tmq_subscribe(): %s
\n
"
,
tmq_err2str
(
code
));
return
;
}
int32_t
totalRows
=
0
;
int32_t
msgCnt
=
0
;
int32_t
consumeDelay
=
5000
;
while
(
running
)
{
TAOS_RES
*
tmqmsg
=
tmq_consumer_poll
(
tmq
,
consumeDelay
);
if
(
tmqmsg
)
{
msgCnt
++
;
totalRows
+=
msg_process
(
tmqmsg
);
taos_free_result
(
tmqmsg
);
}
else
{
break
;
}
}
fprintf
(
stderr
,
"%d msg consumed, include %d rows
\n
"
,
msgCnt
,
totalRows
);
}
int
main
(
int
argc
,
char
*
argv
[])
{
int32_t
code
;
if
(
init_env
()
<
0
)
{
return
-
1
;
}
if
(
create_topic
()
<
0
)
{
return
-
1
;
}
tmq_t
*
tmq
=
build_consumer
();
if
(
NULL
==
tmq
)
{
fprintf
(
stderr
,
"%% build_consumer() fail!
\n
"
);
return
-
1
;
}
tmq_list_t
*
topic_list
=
build_topic_list
();
if
(
NULL
==
topic_list
)
{
return
-
1
;
}
basic_consume_loop
(
tmq
,
topic_list
);
code
=
tmq_unsubscribe
(
tmq
);
if
(
code
)
{
fprintf
(
stderr
,
"%% Failed to unsubscribe: %s
\n
"
,
tmq_err2str
(
code
));
}
else
{
fprintf
(
stderr
,
"%% unsubscribe
\n
"
);
}
code
=
tmq_consumer_close
(
tmq
);
if
(
code
)
{
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
code
));
}
else
{
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录