未验证 提交 2dbb44ef 编写于 作者: X xiaolei li 提交者: GitHub

[TD-11008]<feature>:csharp support subscribe and add sample (#10524)

* [TD-11008]<feature>:csharp support subscribe and add sample

* [TD-11008]<feature>:update c# subscribe sample
上级 9ee2b047
......@@ -6,3 +6,5 @@ src/test/XUnitTest/bin/
src/test/XUnitTest/obj/
src/test/doc/
NugetPackTest/
examples/bin/
examples/obj/
......@@ -16,6 +16,9 @@ namespace AsyncQueryExample
AsyncQuerySample asyncQuery = new AsyncQuerySample();
asyncQuery.RunQueryAsync(conn,"query_async");
SubscribeSample subscribeSample = new SubscribeSample();
subscribeSample.RunSubscribeWithCallback(conn, "subscribe_with_callback");
subscribeSample.RunSubscribeWithoutCallback(conn, "subscribe_without_callback");
UtilsTools.CloseConnection(conn);
}
......
using System;
using TDengineDriver;
using Sample.UtilsTools;
using System.Runtime.InteropServices;
using System.Collections.Generic;
using System.Threading;
namespace Example
{
public class SubscribeSample
{
long ts = 1646150410000;
public void RunSubscribeWithCallback(IntPtr conn, string table)
{
PrepareData(conn, table);
string topic = $"{table}_topic";
string sql = $"select * from {table}";
SubscribeCallback subscribeCallback = new SubscribeCallback(SubCallback);
// Subscribe from earliest timestamp in the table.
IntPtr subscribe = TDengine.Subscribe(conn, true, topic, sql, subscribeCallback, IntPtr.Zero, 1000);
// Add new data.
for (int i = 0; i < 4; i++)
{
InsertData(conn, table);
}
Console.WriteLine("Unsubscribe and keep the subscribe progress ");
TDengine.Unsubscribe(subscribe, true);
Console.WriteLine("Subscribe from last subscribe progress");
subscribe = TDengine.Subscribe(conn, false, topic, sql, subscribeCallback, IntPtr.Zero, 1000);
for (int i = 0; i < 4; i++)
{
InsertData(conn, table);
}
Console.WriteLine("Unsubscribe and remove the subscribe progress ");
TDengine.Unsubscribe(subscribe, false);
}
public void RunSubscribeWithoutCallback(IntPtr conn, string table)
{
PrepareData(conn, table);
string topic = $"{table}_topic";
string sql = $"select * from {table}";
IntPtr subscribe = TDengine.Subscribe(conn, true, topic, sql, null, IntPtr.Zero, 1000);
Console.WriteLine("consume from begin");
//Don't release this TAO_RES and end of the subscribe application,other wise will lead crash.
IntPtr taosRes = TDengine.Consume(subscribe);
UtilsTools.DisplayRes(taosRes);
for (int i = 0; i < 3; i++)
{
InsertData(conn, table);
}
Console.WriteLine("consume new data");
taosRes = TDengine.Consume(subscribe);
UtilsTools.DisplayRes(taosRes);
Console.WriteLine("Unsubscribe and keep progress");
TDengine.Unsubscribe(subscribe, true);
// Subscribe from last subscribe progress.
subscribe = TDengine.Subscribe(conn, false, topic, sql, null, IntPtr.Zero, 1000);
for (int i = 0; i < 3; i++)
{
InsertData(conn, table);
Console.WriteLine($"Consume {i+1} time");
taosRes = TDengine.Consume(subscribe);
// The interval between two consume should greater than "interval" pass in Subscribe().
// Otherwise consume will be blocked.
Thread.Sleep(1000);
UtilsTools.DisplayRes(taosRes);
}
TDengine.Unsubscribe(subscribe, false);
TDengine.FreeResult(taosRes);
}
public void SubCallback(IntPtr subscribe, IntPtr taosRes, IntPtr param, int code)
{
if (code == 0 && taosRes != IntPtr.Zero)
{
// cannot free taosRes using taosRes, otherwise will cause crash.
Console.WriteLine($"Display taosRes in subscribe callback");
UtilsTools.DisplayRes(taosRes);
}
else
{
Console.WriteLine($"async query data failed, failed code {code}");
}
}
public void PrepareData(IntPtr conn, string tableName)
{
string createTable = $"create table if not exists {tableName} (ts timestamp,i8 tinyint,i16 smallint,i32 int,i64 bigint);";
string insert1 = $"insert into {tableName} values({ts},1,2,3,4)";
UtilsTools.ExecuteUpdate(conn, createTable);
UtilsTools.ExecuteUpdate(conn, insert1);
}
public void InsertData(IntPtr conn, string tableName)
{
ts = ts + 100;
string insert1 = $"insert into {tableName} values({ts},1,2,3,4)";
ts = ts + 100;
string insert2 = $"insert into {tableName} values({ts},-1,-2,-3,-4)";
UtilsTools.ExecuteUpdate(conn, insert1);
UtilsTools.ExecuteUpdate(conn, insert2);
Thread.Sleep(500);
}
}
}
......@@ -41,7 +41,7 @@ namespace Sample.UtilsTools
}
else if (OperatingSystem.IsOSPlatform("macOS"))
{
configDir = "/etc/taos";
configDir = "/usr/local/etc/taos";
}
return configDir;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册