提交 5ecf860f 编写于 作者: D dingbo

test: subscribe with rust

上级 00ac9049
...@@ -7,3 +7,7 @@ edition = "2021" ...@@ -7,3 +7,7 @@ edition = "2021"
taos = { version = "*", default-features = false, features = ["ws"] } taos = { version = "*", default-features = false, features = ["ws"] }
tokio = { version = "1", features = ["full"]} tokio = { version = "1", features = ["full"]}
anyhow = "1.0.0" anyhow = "1.0.0"
serde = { version = "1", features = ["derive"]}
chrono = "*"
pretty_env_logger = "0.4"
log = "*"
...@@ -18,6 +18,7 @@ struct Record { ...@@ -18,6 +18,7 @@ struct Record {
async fn prepare(taos: Taos) -> anyhow::Result<()> { async fn prepare(taos: Taos) -> anyhow::Result<()> {
let inserted = taos.exec_many([ let inserted = taos.exec_many([
"use tmq",
// create child table // create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')", "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
// insert into child table // insert into child table
...@@ -35,22 +36,20 @@ async fn prepare(taos: Taos) -> anyhow::Result<()> { ...@@ -35,22 +36,20 @@ async fn prepare(taos: Taos) -> anyhow::Result<()> {
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
let mut dsn = std::env::var("TDENGINE_CLOUD_DSN").parse()?; std::env::set_var("RUST_LOG", "debug");
let builder = TaosBuilder::from_dsn(dsn)?; pretty_env_logger::init();
let dsn = std::env::var("TDENGINE_CLOUD_DSN")?;
let builder = TaosBuilder::from_dsn(&dsn)?;
let taos = builder.build()?; let taos = builder.build()?;
let db = "tmq";
// prepare database // prepare database
taos.exec_many([ taos.exec_many([
format!("DROP TOPIC IF EXISTS tmq_meters"), "DROP TOPIC IF EXISTS tmq_meters",
format!("DROP DATABASE IF EXISTS `{db}`"), "USE tmq",
format!("CREATE DATABASE `{db}`"), "CREATE STABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))",
format!("USE `{db}`"), "CREATE TOPIC tmq_meters with META AS DATABASE tmq"
// create super table
format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"),
// create topic for subscription
format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}")
]) ])
.await?; .await?;
...@@ -59,10 +58,13 @@ async fn main() -> anyhow::Result<()> { ...@@ -59,10 +58,13 @@ async fn main() -> anyhow::Result<()> {
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;
// subscribe // subscribe
let tmq = TmqBuilder::from_dsn("taosws://localhost:6030/?group.id=test")?; let dsn2 = format!("{dsn}&group.id=test");
dbg!(&dsn2);
let tmq = TmqBuilder::from_dsn(dsn2)?;
let mut consumer = tmq.build()?; let mut consumer = tmq.build()?;
consumer.subscribe(["tmq_meters"]).await?; consumer.subscribe(["tmq_meters"]).await?;
println!("start subscription");
{ {
let mut stream = consumer.stream(); let mut stream = consumer.stream();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册