Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9601c6db
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
9601c6db
编写于
8月 25, 2022
作者:
sangshuduo
提交者:
GitHub
8月 25, 2022
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
docs: update rust cloud tmq sample code (#16409)
上级
542e864c
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
102 addition
and
1 deletion
+102
-1
docs/en/09-data-out/_sub_rust.mdx
docs/en/09-data-out/_sub_rust.mdx
+1
-1
docs/examples/rust/cloud-example/examples/subscribe_demo.rs
docs/examples/rust/cloud-example/examples/subscribe_demo.rs
+101
-0
未找到文件。
docs/en/09-data-out/_sub_rust.mdx
浏览文件 @
9601c6db
```rust
```rust
{{#include docs/examples/rust/
native
example/examples/subscribe_demo.rs}}
{{#include docs/examples/rust/
cloud-
example/examples/subscribe_demo.rs}}
```
```
docs/examples/rust/cloud-example/examples/subscribe_demo.rs
0 → 100644
浏览文件 @
9601c6db
use
std
::
time
::
Duration
;
use
chrono
::{
DateTime
,
Local
};
use
taos
::
*
;
// Query options 2, use deserialization with serde.
#[derive(Debug,
serde::Deserialize)]
#[allow(dead_code)]
struct
Record
{
// deserialize timestamp to chrono::DateTime<Local>
ts
:
DateTime
<
Local
>
,
// float to f32
current
:
Option
<
f32
>
,
// int to i32
voltage
:
Option
<
i32
>
,
phase
:
Option
<
f32
>
,
}
async
fn
prepare
(
taos
:
Taos
)
->
anyhow
::
Result
<
()
>
{
let
inserted
=
taos
.exec_many
([
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')"
,
// insert into child table
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)"
,
// insert with NULL values
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)"
,
// insert and automatically create table with tags if not exists
"INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)"
,
// insert many records in a single sql
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)"
,
])
.await
?
;
assert_eq!
(
inserted
,
6
);
Ok
(())
}
#[tokio::main]
async
fn
main
()
->
anyhow
::
Result
<
()
>
{
let
dsn
=
"taosws://localhost:6030"
;
let
builder
=
TaosBuilder
::
from_dsn
(
dsn
)
?
;
let
taos
=
builder
.build
()
?
;
let
db
=
"tmq"
;
// prepare database
taos
.exec_many
([
format!
(
"DROP TOPIC IF EXISTS tmq_meters"
),
format!
(
"DROP DATABASE IF EXISTS `{db}`"
),
format!
(
"CREATE DATABASE `{db}`"
),
format!
(
"USE `{db}`"
),
// create super table
format!
(
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
),
// create topic for subscription
format!
(
"CREATE TOPIC tmq_meters with META AS DATABASE {db}"
)
])
.await
?
;
let
task
=
tokio
::
spawn
(
prepare
(
taos
));
tokio
::
time
::
sleep
(
Duration
::
from_secs
(
1
))
.await
;
// subscribe
let
tmq
=
TmqBuilder
::
from_dsn
(
"taosws://localhost:6030/?group.id=test"
)
?
;
let
mut
consumer
=
tmq
.build
()
?
;
consumer
.subscribe
([
"tmq_meters"
])
.await
?
;
{
let
mut
stream
=
consumer
.stream
();
while
let
Some
((
offset
,
message
))
=
stream
.try_next
()
.await
?
{
// get information from offset
// the topic
let
topic
=
offset
.topic
();
// the vgroup id, like partition id in kafka.
let
vgroup_id
=
offset
.vgroup_id
();
println!
(
"* in vgroup id {vgroup_id} of topic {topic}
\n
"
);
if
let
Some
(
data
)
=
message
.into_data
()
{
while
let
Some
(
block
)
=
data
.fetch_raw_block
()
.await
?
{
// one block for one table, get table name if needed
let
name
=
block
.table_name
();
let
records
:
Vec
<
Record
>
=
block
.deserialize
()
.try_collect
()
?
;
println!
(
"** table: {}, got {} records: {:#?}
\n
"
,
name
.unwrap
(),
records
.len
(),
records
);
}
}
consumer
.commit
(
offset
)
.await
?
;
}
}
consumer
.unsubscribe
()
.await
;
task
.await
??
;
Ok
(())
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录