SubscribeSample.cs 4.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
using System;
using TDengineDriver;
using Sample.UtilsTools;
using System.Runtime.InteropServices;
using System.Collections.Generic;
using System.Threading;

namespace Example
{

    public class SubscribeSample
    {
        long ts = 1646150410000;
        public void RunSubscribeWithCallback(IntPtr conn, string table)
        {
            PrepareData(conn, table);
            string topic = $"{table}_topic";
            string sql = $"select * from {table}";
            SubscribeCallback subscribeCallback = new SubscribeCallback(SubCallback);
            
            // Subscribe from earliest timestamp in the table.
            IntPtr subscribe = TDengine.Subscribe(conn, true, topic, sql, subscribeCallback, IntPtr.Zero, 1000);

            // Add new data.
            for (int i = 0; i < 4; i++)
            {
                InsertData(conn, table);
            }
            Console.WriteLine("Unsubscribe and keep the subscribe progress ");
            TDengine.Unsubscribe(subscribe, true);
            
            Console.WriteLine("Subscribe from last subscribe progress");
            subscribe = TDengine.Subscribe(conn, false, topic, sql, subscribeCallback, IntPtr.Zero, 1000);
            for (int i = 0; i < 4; i++)
            {
                InsertData(conn, table);
            }
            Console.WriteLine("Unsubscribe and remove the subscribe progress ");
            TDengine.Unsubscribe(subscribe, false);
        }

        public void RunSubscribeWithoutCallback(IntPtr conn, string table)
        {
            
            PrepareData(conn, table);
            string topic = $"{table}_topic";
            string sql = $"select * from {table}";
            IntPtr subscribe = TDengine.Subscribe(conn, true, topic, sql, null, IntPtr.Zero, 1000);
            Console.WriteLine("consume from begin");
            //Don't release this TAO_RES and end of the subscribe application,other wise will lead crash.
            IntPtr taosRes = TDengine.Consume(subscribe);
            UtilsTools.DisplayRes(taosRes);
            for (int i = 0; i < 3; i++)
            {
                InsertData(conn, table);
            }
            Console.WriteLine("consume new data");
            taosRes = TDengine.Consume(subscribe);
            UtilsTools.DisplayRes(taosRes);
            Console.WriteLine("Unsubscribe and keep progress");
            TDengine.Unsubscribe(subscribe, true);

            // Subscribe from last subscribe progress. 
            subscribe = TDengine.Subscribe(conn, false, topic, sql, null, IntPtr.Zero, 1000);           
            for (int i = 0; i < 3; i++)
            {
                InsertData(conn, table);
                Console.WriteLine($"Consume {i+1} time");
                taosRes = TDengine.Consume(subscribe);
                // The interval between two consume should greater than "interval" pass in Subscribe().
                // Otherwise consume will be blocked.
                Thread.Sleep(1000);
                UtilsTools.DisplayRes(taosRes);
            }
            TDengine.Unsubscribe(subscribe, false);
            TDengine.FreeResult(taosRes);
        }

        public void SubCallback(IntPtr subscribe, IntPtr taosRes, IntPtr param, int code)
        {
            if (code == 0 && taosRes != IntPtr.Zero)
            {
                // cannot free taosRes using taosRes, otherwise will cause crash.
                Console.WriteLine($"Display taosRes in subscribe callback");
                UtilsTools.DisplayRes(taosRes);
            }
            else
            {
                Console.WriteLine($"async query data failed, failed code {code}");
            }

        }
        public void PrepareData(IntPtr conn, string tableName)
        {
            string createTable = $"create table if not exists {tableName} (ts timestamp,i8 tinyint,i16 smallint,i32 int,i64 bigint);";
            string insert1 = $"insert into {tableName} values({ts},1,2,3,4)";

            UtilsTools.ExecuteUpdate(conn, createTable);
            UtilsTools.ExecuteUpdate(conn, insert1);
        }

        public void InsertData(IntPtr conn, string tableName)
        {
            ts = ts + 100;
            string insert1 = $"insert into {tableName} values({ts},1,2,3,4)";
            ts = ts + 100;
            string insert2 = $"insert into {tableName} values({ts},-1,-2,-3,-4)";

            UtilsTools.ExecuteUpdate(conn, insert1);
            UtilsTools.ExecuteUpdate(conn, insert2);
            Thread.Sleep(500);
        }

    }
}