diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index f36f76fd8565966dae8e20a31cc2d6903b5d26db..957a8bbddab7d42828bbba3a74ac5ef9b5115eb6 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -87,6 +87,31 @@ void commitSync() throws SQLException; void close() throws SQLException; ``` + + + +```C# +ConsumerBuilder(IEnumerable> config) + +virtual IConsumer Build() + +Consumer(ConsumerBuilder builder) + +void Subscribe(IEnumerable topics) + +void Subscribe(string topic) + +ConsumeResult Consume(int millisecondsTimeout) + +List Subscription() + +void Unsubscribe() + +void Commit(ConsumeResult consumerResult) + +void Close() +``` + @@ -229,6 +254,30 @@ public class MetersDeserializer extends ReferenceDeserializer { } ``` + + + +```C# +using TDengineTMQ; + +// 根据需要,设置消费组 (GourpId)、自动提交 (EnableAutoCommit)、 +// 自动提交时间间隔 (AutoCommitIntervalMs)、用户名 (TDConnectUser)、密码 (TDConnectPasswd) 等参数 +var cfg = new ConsumerConfig + { + EnableAutoCommit = "true" + AutoCommitIntervalMs = "1000" + GourpId = "TDengine-TMQ-C#", + TDConnectUser = "root", + TDConnectPasswd = "taosdata", + AutoOffsetReset = "earliest" + MsgWithTableName = "true", + TDConnectIp = "127.0.0.1", + TDConnectPort = "6030" + }; + +var consumer = new ConsumerBuilder(cfg).Build(); +``` + @@ -262,6 +311,18 @@ consumer.subscribe(topics); + + +```C# +// 创建订阅 topics 列表 +List topics = new List(); +topics.add("tmq_topic"); +// 启动订阅 +consumer.Subscribe(topics); +``` + + + ## 消费 @@ -296,6 +357,23 @@ while(running){ + + +```C# +// 消费数据 +while (true) +{ + var consumerRes = consumer.Consume(100); + // process ConsumeResult + ProcessMsg(consumerRes); + consumer.Commit(consumerRes); +} +``` + + + + + ## 结束消费 消费结束后,应当取消订阅。 @@ -322,6 +400,18 @@ consumer.unsubscribe(); consumer.close(); ``` + + + + +```C# +// 取消订阅 +consumer.Unsubscribe(); + +// 关闭消费 +consumer.Close(); +``` +