未验证 提交 535724a5 编写于 作者: X xiaolei li 提交者: GitHub

[TD-14356]<fix>:csharp remove stream_open() && stream_close() (#11083)

上级 338d542d
...@@ -20,9 +20,6 @@ namespace AsyncQueryExample ...@@ -20,9 +20,6 @@ namespace AsyncQueryExample
subscribeSample.RunSubscribeWithCallback(conn, "subscribe_with_callback"); subscribeSample.RunSubscribeWithCallback(conn, "subscribe_with_callback");
subscribeSample.RunSubscribeWithoutCallback(conn, "subscribe_without_callback"); subscribeSample.RunSubscribeWithoutCallback(conn, "subscribe_without_callback");
StreamSample streamSample = new StreamSample();
streamSample.RunStreamOption1(conn, "stream_sample_option1");
UtilsTools.CloseConnection(conn); UtilsTools.CloseConnection(conn);
} }
} }
......
using System;
using TDengineDriver;
using Sample.UtilsTools;
using System.Runtime.InteropServices;
using System.Threading;
using System.Collections.Generic;
using System.Text;
namespace Example
{
public class StreamSample
{
public void RunStreamOption1(IntPtr conn, string table)
{
PrepareData(conn, table);
StreamOpenCallback streamOpenCallback = new StreamOpenCallback(StreamCallback);
IntPtr stream = TDengine.OpenStream(conn, $"select count(*) from {table} interval(1m) sliding(30s)", streamOpenCallback, 0, IntPtr.Zero, null);
if (stream == IntPtr.Zero)
{
throw new Exception("OPenStream failed");
}
else
{
Thread.Sleep(100000);
AddNewData(conn, table, 5,true);
Thread.Sleep(100000);
TDengine.CloseStream(stream);
Console.WriteLine("stream done");
}
}
public void StreamCallback(IntPtr param, IntPtr taosRes, IntPtr taosRow)
{
if (taosRes == IntPtr.Zero || taosRow == IntPtr.Zero)
{
return;
}
else
{
var rowData = new List<Object>();
rowData = UtilsTools.FetchRow(taosRow, taosRes);
int count = 0;
rowData.ForEach((item) =>
{
Console.Write("{0} \t|\t", item.ToString());
count++;
if (count % rowData.Count == 0)
{
Console.WriteLine("");
}
});
}
}
public void PrepareData(IntPtr conn, string tableName)
{
string createTable = $"create table if not exists {tableName} (ts timestamp,i8 tinyint,i16 smallint,i32 int,i64 bigint);";
UtilsTools.ExecuteUpdate(conn, createTable);
AddNewData(conn, tableName, 5);
}
public void AddNewData(IntPtr conn, string tableName, int numRows,bool interval = false)
{
long ts = 1646150410100;
Random rs = new Random();
StringBuilder insert = new StringBuilder();
Random rd = new Random();
for (int i = 0; i < numRows; i++)
{
insert.Append("insert into ");
insert.Append(tableName);
insert.Append(" values ");
insert.Append('(');
insert.Append(ts);
insert.Append(',');
insert.Append(rs.Next(sbyte.MinValue+1, sbyte.MaxValue));
insert.Append(',');
insert.Append(rs.Next(short.MinValue+1, short.MaxValue));
insert.Append(',');
insert.Append(rs.Next(int.MinValue+1, int.MaxValue));
insert.Append(',');
insert.Append(rs.Next(int.MinValue+1, int.MaxValue));
insert.Append(')');
UtilsTools.ExecuteUpdate(conn, insert.ToString());
insert.Clear();
ts += rd.Next(10000, 100000);
if( interval)
{
Thread.Sleep(rs.Next(100,300) * i);
}
else
{
continue;
}
}
}
}
}
\ No newline at end of file
...@@ -179,8 +179,6 @@ namespace TDengineDriver ...@@ -179,8 +179,6 @@ namespace TDengineDriver
/// <param name="param"> Additional parameters supplied by the client when taos_subscribe is called.</param> /// <param name="param"> Additional parameters supplied by the client when taos_subscribe is called.</param>
/// <param name="code"> Error code.</param> /// <param name="code"> Error code.</param>
public delegate void SubscribeCallback(IntPtr subscribe, IntPtr tasRes, IntPtr param, int code); public delegate void SubscribeCallback(IntPtr subscribe, IntPtr tasRes, IntPtr param, int code);
public delegate void StreamOpenCallback(IntPtr param, IntPtr taosRes, IntPtr taosRow);
public delegate void StreamOpenCallback2(IntPtr ptr);
public class TDengine public class TDengine
{ {
...@@ -561,33 +559,5 @@ namespace TDengineDriver ...@@ -561,33 +559,5 @@ namespace TDengineDriver
} }
} }
// Stream
/// <summary>
/// Used to open an stream, which can do continuous query.
/// </summary>
/// <param name="taos"> taos connection return by <see cref = "Connect"></param>
/// <param name="sql"> Query statement( query only)</param>
/// <param name="fp"> User defined callback.</param>
/// <param name="stime"> The time when stream computing starts. If it is 0, it means starting from now.
/// If it is not zero, it means starting from the specified time (the number of
/// milliseconds from 1970/1/1 UTC time).
/// </param>
/// <param name="param">First parameter provide by application for callback usage.
/// While callback,this parameter is provided to the application.</param>
/// <param name="callback2">The second callback function which will be called when the continuous query
/// stop automatically.</param>
/// <returns> Return null indicate creation failed, not null for success.</returns>
[DllImport("taos", EntryPoint = "taos_open_stream", CallingConvention = CallingConvention.Cdecl)]
static extern public IntPtr OpenStream(IntPtr taos, string sql, StreamOpenCallback fp, Int64 stime, IntPtr param, StreamOpenCallback2 callback2);
/// <summary>
/// Used too stop data flow.
/// Remember to stop data flow when you stopped steam computing.
/// </summary>
/// <param name="stream"> Value returned by <see cref = "OpenStream"></param>
[DllImport("taos", EntryPoint = "taos_close_stream", CallingConvention = CallingConvention.Cdecl)]
static extern public void CloseStream(IntPtr stream);
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册