未验证 提交 abf3204b 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18257 from taosdata/docs/TD-20499-rust-tmq-demo

docs: update tmq demo
......@@ -10,4 +10,4 @@ chrono = "0.4"
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] }
taos = { version = "0.*" }
taos = { version = "0.4.8" }
......@@ -12,7 +12,10 @@ async fn main() -> anyhow::Result<()> {
// bind table name and tags
stmt.set_tbname_tags(
"d1001",
&[Value::VarChar("California.SanFransico".into()), Value::Int(2)],
&[
Value::VarChar("California.SanFransico".into()),
Value::Int(2),
],
)?;
// bind values.
let values = vec![
......
......@@ -50,7 +50,7 @@ async fn main() -> anyhow::Result<()> {
// create super table
format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"),
// create topic for subscription
format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}")
format!("CREATE TOPIC tmq_meters AS SELECT * FROM `meters`")
])
.await?;
......@@ -64,13 +64,9 @@ async fn main() -> anyhow::Result<()> {
let mut consumer = tmq.build()?;
consumer.subscribe(["tmq_meters"]).await?;
{
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
consumer
.stream()
.try_for_each(|(offset, message)| async {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
......@@ -78,20 +74,14 @@ async fn main() -> anyhow::Result<()> {
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
consumer.commit(offset).await?;
}
}
Ok(())
})
.await?;
consumer.unsubscribe().await;
......
......@@ -5,7 +5,6 @@ async fn main() -> anyhow::Result<()> {
let dsn = "ws://";
let taos = TaosBuilder::from_dsn(dsn)?.build()?;
taos.exec_many([
"DROP DATABASE IF EXISTS power",
"CREATE DATABASE power",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册