From 9601c6db152a2d8cba4eebd33847cab3610412c5 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Thu, 25 Aug 2022 16:01:08 +0800 Subject: [PATCH] docs: update rust cloud tmq sample code (#16409) --- docs/en/09-data-out/_sub_rust.mdx | 2 +- .../cloud-example/examples/subscribe_demo.rs | 101 ++++++++++++++++++ 2 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 docs/examples/rust/cloud-example/examples/subscribe_demo.rs diff --git a/docs/en/09-data-out/_sub_rust.mdx b/docs/en/09-data-out/_sub_rust.mdx index 0021666a70..eb06c8f18c 100644 --- a/docs/en/09-data-out/_sub_rust.mdx +++ b/docs/en/09-data-out/_sub_rust.mdx @@ -1,3 +1,3 @@ ```rust -{{#include docs/examples/rust/nativeexample/examples/subscribe_demo.rs}} +{{#include docs/examples/rust/cloud-example/examples/subscribe_demo.rs}} ``` diff --git a/docs/examples/rust/cloud-example/examples/subscribe_demo.rs b/docs/examples/rust/cloud-example/examples/subscribe_demo.rs new file mode 100644 index 0000000000..5b20cabe2f --- /dev/null +++ b/docs/examples/rust/cloud-example/examples/subscribe_demo.rs @@ -0,0 +1,101 @@ +use std::time::Duration; + +use chrono::{DateTime, Local}; +use taos::*; + +// Query options 2, use deserialization with serde. +#[derive(Debug, serde::Deserialize)] +#[allow(dead_code)] +struct Record { + // deserialize timestamp to chrono::DateTime + ts: DateTime, + // float to f32 + current: Option, + // int to i32 + voltage: Option, + phase: Option, +} + +async fn prepare(taos: Taos) -> anyhow::Result<()> { + let inserted = taos.exec_many([ + // create child table + "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')", + // insert into child table + "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)", + // insert with NULL values + "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)", + // insert and automatically create table with tags if not exists + "INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)", + // insert many records in a single sql + "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)", + ]).await?; + assert_eq!(inserted, 6); + Ok(()) +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let dsn = "taosws://localhost:6030"; + let builder = TaosBuilder::from_dsn(dsn)?; + + let taos = builder.build()?; + let db = "tmq"; + + // prepare database + taos.exec_many([ + format!("DROP TOPIC IF EXISTS tmq_meters"), + format!("DROP DATABASE IF EXISTS `{db}`"), + format!("CREATE DATABASE `{db}`"), + format!("USE `{db}`"), + // create super table + format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"), + // create topic for subscription + format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}") + ]) + .await?; + + let task = tokio::spawn(prepare(taos)); + + tokio::time::sleep(Duration::from_secs(1)).await; + + // subscribe + let tmq = TmqBuilder::from_dsn("taosws://localhost:6030/?group.id=test")?; + + let mut consumer = tmq.build()?; + consumer.subscribe(["tmq_meters"]).await?; + + { + let mut stream = consumer.stream(); + + while let Some((offset, message)) = stream.try_next().await? { + // get information from offset + + // the topic + let topic = offset.topic(); + // the vgroup id, like partition id in kafka. + let vgroup_id = offset.vgroup_id(); + println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + + if let Some(data) = message.into_data() { + while let Some(block) = data.fetch_raw_block().await? { + // one block for one table, get table name if needed + let name = block.table_name(); + let records: Vec = block.deserialize().try_collect()?; + println!( + "** table: {}, got {} records: {:#?}\n", + name.unwrap(), + records.len(), + records + ); + } + } + consumer.commit(offset).await?; + } + } + + consumer.unsubscribe().await; + + task.await??; + + Ok(()) +} -- GitLab