From 2dbb44efff6e61556b1e9c6f7a671eff06a5fc4d Mon Sep 17 00:00:00 2001 From: xiaolei li <85657333+xleili@users.noreply.github.com> Date: Thu, 3 Mar 2022 17:27:09 +0800 Subject: [PATCH] [TD-11008]:csharp support subscribe and add sample (#10524) * [TD-11008]:csharp support subscribe and add sample * [TD-11008]:update c# subscribe sample --- src/connector/C#/.gitignore | 4 +- src/connector/C#/examples/Main.cs | 5 +- src/connector/C#/examples/SubscribeSample.cs | 115 +++++++++++++++++++ src/connector/C#/examples/lib/Utils.cs | 2 +- 4 files changed, 123 insertions(+), 3 deletions(-) create mode 100644 src/connector/C#/examples/SubscribeSample.cs diff --git a/src/connector/C#/.gitignore b/src/connector/C#/.gitignore index 9564987077..80558c70a1 100644 --- a/src/connector/C#/.gitignore +++ b/src/connector/C#/.gitignore @@ -5,4 +5,6 @@ src/test/FunctionTest/obj/ src/test/XUnitTest/bin/ src/test/XUnitTest/obj/ src/test/doc/ -NugetPackTest/ \ No newline at end of file +NugetPackTest/ +examples/bin/ +examples/obj/ diff --git a/src/connector/C#/examples/Main.cs b/src/connector/C#/examples/Main.cs index 0f1db17766..9d2ab85a87 100644 --- a/src/connector/C#/examples/Main.cs +++ b/src/connector/C#/examples/Main.cs @@ -15,7 +15,10 @@ namespace AsyncQueryExample AsyncQuerySample asyncQuery = new AsyncQuerySample(); asyncQuery.RunQueryAsync(conn,"query_async"); - + + SubscribeSample subscribeSample = new SubscribeSample(); + subscribeSample.RunSubscribeWithCallback(conn, "subscribe_with_callback"); + subscribeSample.RunSubscribeWithoutCallback(conn, "subscribe_without_callback"); UtilsTools.CloseConnection(conn); } diff --git a/src/connector/C#/examples/SubscribeSample.cs b/src/connector/C#/examples/SubscribeSample.cs new file mode 100644 index 0000000000..446b37f6a2 --- /dev/null +++ b/src/connector/C#/examples/SubscribeSample.cs @@ -0,0 +1,115 @@ +using System; +using TDengineDriver; +using Sample.UtilsTools; +using System.Runtime.InteropServices; +using System.Collections.Generic; +using System.Threading; + +namespace Example +{ + + public class SubscribeSample + { + long ts = 1646150410000; + public void RunSubscribeWithCallback(IntPtr conn, string table) + { + PrepareData(conn, table); + string topic = $"{table}_topic"; + string sql = $"select * from {table}"; + SubscribeCallback subscribeCallback = new SubscribeCallback(SubCallback); + + // Subscribe from earliest timestamp in the table. + IntPtr subscribe = TDengine.Subscribe(conn, true, topic, sql, subscribeCallback, IntPtr.Zero, 1000); + + // Add new data. + for (int i = 0; i < 4; i++) + { + InsertData(conn, table); + } + Console.WriteLine("Unsubscribe and keep the subscribe progress "); + TDengine.Unsubscribe(subscribe, true); + + Console.WriteLine("Subscribe from last subscribe progress"); + subscribe = TDengine.Subscribe(conn, false, topic, sql, subscribeCallback, IntPtr.Zero, 1000); + for (int i = 0; i < 4; i++) + { + InsertData(conn, table); + } + Console.WriteLine("Unsubscribe and remove the subscribe progress "); + TDengine.Unsubscribe(subscribe, false); + } + + public void RunSubscribeWithoutCallback(IntPtr conn, string table) + { + + PrepareData(conn, table); + string topic = $"{table}_topic"; + string sql = $"select * from {table}"; + IntPtr subscribe = TDengine.Subscribe(conn, true, topic, sql, null, IntPtr.Zero, 1000); + Console.WriteLine("consume from begin"); + //Don't release this TAO_RES and end of the subscribe application,other wise will lead crash. + IntPtr taosRes = TDengine.Consume(subscribe); + UtilsTools.DisplayRes(taosRes); + for (int i = 0; i < 3; i++) + { + InsertData(conn, table); + } + Console.WriteLine("consume new data"); + taosRes = TDengine.Consume(subscribe); + UtilsTools.DisplayRes(taosRes); + Console.WriteLine("Unsubscribe and keep progress"); + TDengine.Unsubscribe(subscribe, true); + + // Subscribe from last subscribe progress. + subscribe = TDengine.Subscribe(conn, false, topic, sql, null, IntPtr.Zero, 1000); + for (int i = 0; i < 3; i++) + { + InsertData(conn, table); + Console.WriteLine($"Consume {i+1} time"); + taosRes = TDengine.Consume(subscribe); + // The interval between two consume should greater than "interval" pass in Subscribe(). + // Otherwise consume will be blocked. + Thread.Sleep(1000); + UtilsTools.DisplayRes(taosRes); + } + TDengine.Unsubscribe(subscribe, false); + TDengine.FreeResult(taosRes); + } + + public void SubCallback(IntPtr subscribe, IntPtr taosRes, IntPtr param, int code) + { + if (code == 0 && taosRes != IntPtr.Zero) + { + // cannot free taosRes using taosRes, otherwise will cause crash. + Console.WriteLine($"Display taosRes in subscribe callback"); + UtilsTools.DisplayRes(taosRes); + } + else + { + Console.WriteLine($"async query data failed, failed code {code}"); + } + + } + public void PrepareData(IntPtr conn, string tableName) + { + string createTable = $"create table if not exists {tableName} (ts timestamp,i8 tinyint,i16 smallint,i32 int,i64 bigint);"; + string insert1 = $"insert into {tableName} values({ts},1,2,3,4)"; + + UtilsTools.ExecuteUpdate(conn, createTable); + UtilsTools.ExecuteUpdate(conn, insert1); + } + + public void InsertData(IntPtr conn, string tableName) + { + ts = ts + 100; + string insert1 = $"insert into {tableName} values({ts},1,2,3,4)"; + ts = ts + 100; + string insert2 = $"insert into {tableName} values({ts},-1,-2,-3,-4)"; + + UtilsTools.ExecuteUpdate(conn, insert1); + UtilsTools.ExecuteUpdate(conn, insert2); + Thread.Sleep(500); + } + + } +} diff --git a/src/connector/C#/examples/lib/Utils.cs b/src/connector/C#/examples/lib/Utils.cs index b549406bee..d5c1fcd2f3 100644 --- a/src/connector/C#/examples/lib/Utils.cs +++ b/src/connector/C#/examples/lib/Utils.cs @@ -41,7 +41,7 @@ namespace Sample.UtilsTools } else if (OperatingSystem.IsOSPlatform("macOS")) { - configDir = "/etc/taos"; + configDir = "/usr/local/etc/taos"; } return configDir; } -- GitLab