未验证 提交 baff522c 编写于 作者: X xiaolei li 提交者: GitHub

[TD-11160]<feature>:csharp support continuous query (#10681)

* [TD-11160]<feature>:csharp support continuous query

* [TD-11160]<feature>:fix indent
上级 92355d64
...@@ -14,12 +14,15 @@ namespace AsyncQueryExample ...@@ -14,12 +14,15 @@ namespace AsyncQueryExample
IntPtr conn = UtilsTools.TDConnection(); IntPtr conn = UtilsTools.TDConnection();
AsyncQuerySample asyncQuery = new AsyncQuerySample(); AsyncQuerySample asyncQuery = new AsyncQuerySample();
asyncQuery.RunQueryAsync(conn,"query_async"); asyncQuery.RunQueryAsync(conn, "query_async");
SubscribeSample subscribeSample = new SubscribeSample(); SubscribeSample subscribeSample = new SubscribeSample();
subscribeSample.RunSubscribeWithCallback(conn, "subscribe_with_callback"); subscribeSample.RunSubscribeWithCallback(conn, "subscribe_with_callback");
subscribeSample.RunSubscribeWithoutCallback(conn, "subscribe_without_callback"); subscribeSample.RunSubscribeWithoutCallback(conn, "subscribe_without_callback");
StreamSample streamSample = new StreamSample();
streamSample.RunStreamOption1(conn, "stream_sample_option1");
UtilsTools.CloseConnection(conn); UtilsTools.CloseConnection(conn);
} }
} }
......
using System;
using TDengineDriver;
using Sample.UtilsTools;
using System.Runtime.InteropServices;
using System.Threading;
using System.Collections.Generic;
using System.Text;
namespace Example
{
public class StreamSample
{
public void RunStreamOption1(IntPtr conn, string table)
{
PrepareData(conn, table);
StreamOpenCallback streamOpenCallback = new StreamOpenCallback(StreamCallback);
IntPtr stream = TDengine.OpenStream(conn, $"select count(*) from {table} interval(1m) sliding(30s)", streamOpenCallback, 0, IntPtr.Zero, null);
if (stream == IntPtr.Zero)
{
throw new Exception("OPenStream failed");
}
else
{
Thread.Sleep(100000);
AddNewData(conn, table, 5,true);
Thread.Sleep(100000);
TDengine.CloseStream(stream);
Console.WriteLine("stream done");
}
}
public void StreamCallback(IntPtr param, IntPtr taosRes, IntPtr taosRow)
{
if (taosRes == IntPtr.Zero || taosRow == IntPtr.Zero)
{
return;
}
else
{
var rowData = new List<Object>();
rowData = UtilsTools.FetchRow(taosRow, taosRes);
int count = 0;
rowData.ForEach((item) =>
{
Console.Write("{0} \t|\t", item.ToString());
count++;
if (count % rowData.Count == 0)
{
Console.WriteLine("");
}
});
}
}
public void PrepareData(IntPtr conn, string tableName)
{
string createTable = $"create table if not exists {tableName} (ts timestamp,i8 tinyint,i16 smallint,i32 int,i64 bigint);";
UtilsTools.ExecuteUpdate(conn, createTable);
AddNewData(conn, tableName, 5);
}
public void AddNewData(IntPtr conn, string tableName, int numRows,bool interval = false)
{
long ts = 1646150410100;
Random rs = new Random();
StringBuilder insert = new StringBuilder();
Random rd = new Random();
for (int i = 0; i < numRows; i++)
{
insert.Append("insert into ");
insert.Append(tableName);
insert.Append(" values ");
insert.Append('(');
insert.Append(ts);
insert.Append(',');
insert.Append(rs.Next(sbyte.MinValue+1, sbyte.MaxValue));
insert.Append(',');
insert.Append(rs.Next(short.MinValue+1, short.MaxValue));
insert.Append(',');
insert.Append(rs.Next(int.MinValue+1, int.MaxValue));
insert.Append(',');
insert.Append(rs.Next(int.MinValue+1, int.MaxValue));
insert.Append(')');
UtilsTools.ExecuteUpdate(conn, insert.ToString());
insert.Clear();
ts += rd.Next(10000, 100000);
if( interval)
{
Thread.Sleep(rs.Next(100,300) * i);
}
else
{
continue;
}
}
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册