Program.cs 2.9 KB
Newer Older
1
using System;
2 3 4
using TDengineTMQ;
using TDengineDriver;
using System.Runtime.InteropServices;
5

6
namespace TMQExample
7 8 9
{
    internal class SubscribeDemo
    {
10 11 12 13 14 15
        static void Main(string[] args)
        {
            IntPtr conn = GetConnection();
            string topic = "topic_example";
            //create topic 
            IntPtr res = TDengine.Query(conn, $"create topic if not exists {topic} as select * from meters");
16 17

            if (TDengine.ErrorNo(res) != 0 )
18 19 20 21 22 23 24 25 26 27
            {
                throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
            }

            var cfg = new ConsumerConfig
            {
                GourpId = "group_1",
                TDConnectUser = "root",
                TDConnectPasswd = "taosdata",
                MsgWithTableName = "true",
28
                TDConnectIp = "127.0.0.1",
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
            };

            // create consumer 
            var consumer = new ConsumerBuilder(cfg)
                .Build();

            // subscribe
            consumer.Subscribe(topic);

            // consume 
            for (int i = 0; i < 5; i++)
            {
                var consumeRes = consumer.Consume(300);
                // print consumeResult
                foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumeRes.Message)
                {
                    Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString());

                    kv.Value.Metas.ForEach(meta =>
                    {
                        Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
                    });
                    Console.WriteLine("");
                    kv.Value.Datas.ForEach(data =>
                    {
                        Console.WriteLine(data.ToString());
                    });
                }

                consumer.Commit(consumeRes);
                Console.WriteLine("\n================ {0} done ", i);

            }

            // retrieve topic list
            List<string> topics = consumer.Subscription();
            topics.ForEach(t => Console.WriteLine("topic name:{0}", t));

            // unsubscribe
            consumer.Unsubscribe();

            // close consumer after use.Otherwise will lead memory leak.
            consumer.Close();
            TDengine.Close(conn);

        }

        static IntPtr GetConnection()
        {
            string host = "localhost";
            short port = 6030;
            string username = "root";
            string password = "taosdata";
            string dbname = "power";
            var conn = TDengine.Connect(host, username, password, dbname, port);
            if (conn == IntPtr.Zero)
            {
86
                throw new Exception("Connect to TDengine failed");
87 88 89 90 91 92 93
            }
            else
            {
                Console.WriteLine("Connect to TDengine success");
            }
            return conn;
        }
94
    }
95

96
}