未验证 提交 94c19686 编写于 作者: X xiaolei li 提交者: GitHub

Feature/xiaolei/td 13569 csharp support async query (#10488)

* [TD-13569]<feature>:csharp async query

* [TD-13569]<feature>:csharp support Async query

* [TD-13569]<feature>:remove some useless method in example util.cs

* [TD-13569]<feature>fix indent
上级 2a46ffba
using System;
using Sample.UtilsTools;
using System.Runtime.InteropServices;
using TDengineDriver;
using Example;
using System.Collections.Generic;
namespace AsyncQueryExample
{
public class EntryPoint
{
static void Main(string[] args)
{
IntPtr conn = UtilsTools.TDConnection();
AsyncQuerySample asyncQuery = new AsyncQuerySample();
asyncQuery.RunQueryAsync(conn,"query_async");
UtilsTools.CloseConnection(conn);
}
}
}
using System;
using TDengineDriver;
using Sample.UtilsTools;
using System.Runtime.InteropServices;
using System.Threading;
namespace Example
{
public class AsyncQuerySample
{
public void RunQueryAsync(IntPtr conn, string table)
{
QueryAsyncCallback queryAsyncCallback = new QueryAsyncCallback(QueryCallback);
PrepareData(conn, table);
Console.WriteLine($"Start calling QueryAsync(),query {table}'s data asynchronously.");
TDengine.QueryAsync(conn, $"select * from {table}", queryAsyncCallback, IntPtr.Zero);
Thread.Sleep(2000);
Console.WriteLine("QueryAsync done.");
}
//prepare the data(table and insert data)
public void PrepareData(IntPtr conn, string tableName)
{
string createTable = $"create table if not exists {tableName} (ts timestamp,i8 tinyint,i16 smallint,i32 int,i64 bigint)tags(t_bnry binary(50), t_nchr nchar(50));";
string insert1 = $"insert into {tableName}_s01 using {tableName} tags('tag1','标签1') values(now,1,2,3,4)(now+1m,5,6,7,8)(now+2m,9,0,-1,-2)(now+3m,-3,-4,-5,-6)(now+4m,-7,-8,-9,0)";
string insert2 = $"insert into {tableName}_s02 using {tableName} tags('tag2','标签2') values(now,1,2,3,4)(now+1m,5,6,7,8)(now+2m,9,0,-1,-2)(now+3m,-3,-4,-5,-6)(now+4m,-7,-8,-9,0)";
string insert3 = $"insert into {tableName}_s03 using {tableName} tags('tag3','标签3') values(now,1,2,3,4)(now+1m,5,6,7,8)(now+2m,9,0,-1,-2)(now+3m,-3,-4,-5,-6)(now+4m,-7,-8,-9,0)";
string insert4 = $"insert into {tableName}_s04 using {tableName} tags('tag4','标签4') values(now,1,2,3,4)(now+1m,5,6,7,8)(now+2m,9,0,-1,-2)(now+3m,-3,-4,-5,-6)(now+4m,-7,-8,-9,0)";
string insert5 = $"insert into {tableName}_s05 using {tableName} tags('tag5','标签5') values(now,1,2,3,4)(now+1m,5,6,7,8)(now+2m,9,0,-1,-2)(now+3m,-3,-4,-5,-6)(now+4m,-7,-8,-9,0)";
UtilsTools.ExecuteUpdate(conn, createTable);
UtilsTools.ExecuteUpdate(conn, insert1);
Thread.Sleep(100);
UtilsTools.ExecuteUpdate(conn, insert2);
Thread.Sleep(100);
UtilsTools.ExecuteUpdate(conn, insert3);
Thread.Sleep(100);
UtilsTools.ExecuteUpdate(conn, insert4);
Thread.Sleep(100);
UtilsTools.ExecuteUpdate(conn, insert5);
}
public void QueryCallback(IntPtr param, IntPtr taosRes, int code)
{
if (code == 0 && taosRes != IntPtr.Zero)
{
FetchRowAsyncCallback fetchRowAsyncCallback = new FetchRowAsyncCallback(FetchCallback);
TDengine.FetchRowAsync(taosRes, fetchRowAsyncCallback, param);
}
else
{
Console.WriteLine($"async query data failed, failed code {code}");
}
}
// Iteratively call this interface until "numOfRows" is no greater than 0.
public void FetchCallback(IntPtr param, IntPtr taosRes, int numOfRows)
{
if (numOfRows > 0)
{
Console.WriteLine($"{numOfRows} rows async retrieved");
UtilsTools.DisplayRes(taosRes);
TDengine.FetchRowAsync(taosRes, FetchCallback, param);
}
else
{
if (numOfRows == 0)
{
Console.WriteLine("async retrieve complete.");
}
else
{
Console.WriteLine($"FetchRowAsync callback error, error code {numOfRows}");
}
TDengine.FreeResult(taosRes);
}
}
}
}
\ No newline at end of file
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<ProjectReference Include="..\src\TDengineDriver\TDengineDriver.csproj" />
</ItemGroup>
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
</PropertyGroup>
</Project>
using System;
using TDengineDriver;
using System.Runtime.InteropServices;
using System.Text;
using System.Collections.Generic;
namespace Sample.UtilsTools
{
public class UtilsTools
{
static string ip = "127.0.0.1";
static string user = "root";
static string password = "taosdata";
static string db = "";
static short port = 0;
static string globalDbName = "csharp_example_db";
//get a tdengine connection
public static IntPtr TDConnection()
{
TDengine.Options((int)TDengineInitOption.TDDB_OPTION_CONFIGDIR, GetConfigPath());
TDengine.Options((int)TDengineInitOption.TDDB_OPTION_SHELL_ACTIVITY_TIMER, "60");
TDengine.Init();
IntPtr conn = TDengine.Connect(ip, user, password, db, port);
UtilsTools.ExecuteUpdate(conn, $"drop database if exists {globalDbName}");
UtilsTools.ExecuteUpdate(conn, $"create database if not exists {globalDbName} keep 3650");
UtilsTools.ExecuteUpdate(conn, $"use {globalDbName}");
return conn;
}
//get taos.cfg file based on different os
public static string GetConfigPath()
{
string configDir = "";
if (OperatingSystem.IsOSPlatform("Windows"))
{
configDir = "C:/TDengine/cfg";
}
else if (OperatingSystem.IsOSPlatform("Linux"))
{
configDir = "/etc/taos";
}
else if (OperatingSystem.IsOSPlatform("macOS"))
{
configDir = "/etc/taos";
}
return configDir;
}
public static IntPtr ExecuteQuery(IntPtr conn, String sql)
{
IntPtr res = TDengine.Query(conn, sql);
if (!IsValidResult(res))
{
Console.Write(sql.ToString() + " failure, ");
ExitProgram();
}
else
{
Console.WriteLine(sql.ToString() + " success");
}
return res;
}
public static IntPtr ExecuteErrorQuery(IntPtr conn, String sql)
{
IntPtr res = TDengine.Query(conn, sql);
if (!IsValidResult(res))
{
Console.Write(sql.ToString() + " failure, ");
ExitProgram();
}
else
{
Console.WriteLine(sql.ToString() + " success");
}
return res;
}
public static void ExecuteUpdate(IntPtr conn, String sql)
{
IntPtr res = TDengine.Query(conn, sql);
if (!IsValidResult(res))
{
Console.Write(sql.ToString() + " failure, ");
ExitProgram();
}
else
{
Console.WriteLine(sql.ToString() + " success");
}
TDengine.FreeResult(res);
}
public static void DisplayRes(IntPtr res)
{
if (!IsValidResult(res))
{
ExitProgram();
}
List<TDengineMeta> metas = GetResField(res);
int fieldCount = metas.Count;
// metas.ForEach((item) => { Console.Write("{0} ({1}) \t|\t", item.name, item.size); });
List<Object> datas = QueryRes(res, metas);
for (int index = 0; index < datas.Count; index++)
{
if (index % fieldCount == 0 && index != 0)
{
Console.WriteLine("");
}
Console.Write("{0} \t|\t", datas[index].ToString());
}
Console.WriteLine("");
}
public static List<List<Object>> GetResultSet(IntPtr res)
{
List<List<Object>> result = new List<List<Object>>();
List<Object> colName = new List<Object>();
List<Object> dataRaw = new List<Object>();
if (!IsValidResult(res))
{
ExitProgram();
}
List<TDengineMeta> metas = GetResField(res);
result.Add(colName);
dataRaw = QueryRes(res, metas);
result.Add(dataRaw);
if (TDengine.ErrorNo(res) != 0)
{
Console.Write("Query is not complete, Error {0:G}", TDengine.ErrorNo(res), TDengine.Error(res));
}
return result;
}
public static bool IsValidResult(IntPtr res)
{
if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
{
if (res != IntPtr.Zero)
{
Console.Write("reason: " + TDengine.Error(res));
return false;
}
Console.WriteLine("");
return false;
}
return true;
}
public static void CloseConnection(IntPtr conn)
{
ExecuteUpdate(conn, $"drop database if exists {globalDbName}");
if (conn != IntPtr.Zero)
{
if (TDengine.Close(conn) == 0)
{
Console.WriteLine("close connection sucess");
}
else
{
Console.WriteLine("close Connection failed");
}
}
}
public static List<TDengineMeta> GetResField(IntPtr res)
{
List<TDengineMeta> metas = TDengine.FetchFields(res);
return metas;
}
public static void ExitProgram()
{
TDengine.Cleanup();
System.Environment.Exit(0);
}
public static List<Object> GetResData(IntPtr res)
{
List<Object> colName = new List<Object>();
List<Object> dataRaw = new List<Object>();
if (!IsValidResult(res))
{
ExitProgram();
}
List<TDengineMeta> metas = GetResField(res);
dataRaw = QueryRes(res, metas);
return dataRaw;
}
private static List<Object> QueryRes(IntPtr res, List<TDengineMeta> metas)
{
IntPtr taosRow;
List<Object> dataRaw = new List<Object>();
int fieldCount = metas.Count;
while ((taosRow = TDengine.FetchRows(res)) != IntPtr.Zero)
{
dataRaw.AddRange(FetchRow(taosRow,res));
}
if (TDengine.ErrorNo(res) != 0)
{
Console.Write("Query is not complete, Error {0:G}", TDengine.ErrorNo(res), TDengine.Error(res));
}
// TDengine.FreeResult(res);
Console.WriteLine("");
return dataRaw;
}
public static List<Object> FetchRow(IntPtr taosRow, IntPtr taosRes)//, List<TDengineMeta> metaList, int numOfFiled
{
List<TDengineMeta> metaList = TDengine.FetchFields(taosRes);
int numOfFiled = TDengine.FieldCount(taosRes);
List<Object> dataRaw = new List<Object>();
IntPtr colLengthPrt = TDengine.FetchLengths(taosRes);
int[] colLengthArr = new int[numOfFiled];
Marshal.Copy(colLengthPrt, colLengthArr, 0, numOfFiled);
for (int i = 0; i < numOfFiled; i++)
{
TDengineMeta meta = metaList[i];
IntPtr data = Marshal.ReadIntPtr(taosRow, IntPtr.Size * i);
if (data == IntPtr.Zero)
{
dataRaw.Add("NULL");
continue;
}
switch ((TDengineDataType)meta.type)
{
case TDengineDataType.TSDB_DATA_TYPE_BOOL:
bool v1 = Marshal.ReadByte(data) == 0 ? false : true;
dataRaw.Add(v1);
break;
case TDengineDataType.TSDB_DATA_TYPE_TINYINT:
sbyte v2 = (sbyte)Marshal.ReadByte(data);
dataRaw.Add(v2);
break;
case TDengineDataType.TSDB_DATA_TYPE_SMALLINT:
short v3 = Marshal.ReadInt16(data);
dataRaw.Add(v3);
break;
case TDengineDataType.TSDB_DATA_TYPE_INT:
int v4 = Marshal.ReadInt32(data);
dataRaw.Add(v4);
break;
case TDengineDataType.TSDB_DATA_TYPE_BIGINT:
long v5 = Marshal.ReadInt64(data);
dataRaw.Add(v5);
break;
case TDengineDataType.TSDB_DATA_TYPE_FLOAT:
float v6 = (float)Marshal.PtrToStructure(data, typeof(float));
dataRaw.Add(v6);
break;
case TDengineDataType.TSDB_DATA_TYPE_DOUBLE:
double v7 = (double)Marshal.PtrToStructure(data, typeof(double));
dataRaw.Add(v7);
break;
case TDengineDataType.TSDB_DATA_TYPE_BINARY:
string v8 = Marshal.PtrToStringUTF8(data, colLengthArr[i]);
dataRaw.Add(v8);
break;
case TDengineDataType.TSDB_DATA_TYPE_TIMESTAMP:
long v9 = Marshal.ReadInt64(data);
dataRaw.Add(v9);
break;
case TDengineDataType.TSDB_DATA_TYPE_NCHAR:
string v10 = Marshal.PtrToStringUTF8(data, colLengthArr[i]);
dataRaw.Add(v10);
break;
case TDengineDataType.TSDB_DATA_TYPE_UTINYINT:
byte v12 = Marshal.ReadByte(data);
dataRaw.Add(v12.ToString());
break;
case TDengineDataType.TSDB_DATA_TYPE_USMALLINT:
ushort v13 = (ushort)Marshal.ReadInt16(data);
dataRaw.Add(v13);
break;
case TDengineDataType.TSDB_DATA_TYPE_UINT:
uint v14 = (uint)Marshal.ReadInt32(data);
dataRaw.Add(v14);
break;
case TDengineDataType.TSDB_DATA_TYPE_UBIGINT:
ulong v15 = (ulong)Marshal.ReadInt64(data);
dataRaw.Add(v15);
break;
case TDengineDataType.TSDB_DATA_TYPE_JSONTAG:
string v16 = Marshal.PtrToStringUTF8(data, colLengthArr[i]);
dataRaw.Add(v16);
break;
default:
dataRaw.Add("nonsupport data type value");
break;
}
}
return dataRaw;
}
}
}
......@@ -144,6 +144,43 @@ namespace TDengineDriver
public int num;
}
/// <summary>
/// User defined callback function for interface "QueryAsync()"
/// ,actually is a delegate in .Net.
/// This function aim to handel the taoRes which points to
/// the caller method's sql resultset.
/// </summary>
/// <param name="param"> This parameter will sent by caller method (QueryAsync()).</param>
/// <param name="taoRes"> This is the retrieved by caller method's sql.</param>
/// <param name="code"> 0 for indicate operation success and negative for operation fail.</param>
public delegate void QueryAsyncCallback(IntPtr param, IntPtr taoRes, int code);
/// <summary>
/// User defined callback function for interface "FetchRowAsync()"
/// ,actually is a delegate in .Net.
/// This callback allow applications to get each row of the
/// batch records by calling FetchRowAsync() forward iteration.
/// After reading all the records in a block, the application needs to continue calling
/// FetchRowAsync() in this callback function to obtain the next batch of records for
/// processing until the number of records
/// </summary>
/// <param name="param">The parameter passed by <see cref="FetchRowAsync"/></param>
/// <param name="taoRes">Query status</param>
/// <param name="numOfRows"> The number of rows of data obtained (not a function of
/// the entire query result set). When the number is zero (the result is returned)
/// or the number of records is negative (the query fails).</param>
public delegate void FetchRowAsyncCallback(IntPtr param, IntPtr taoRes, int numOfRows);
/// <summary>
/// In asynchronous mode, the prototype of the callback function.
/// </summary>
/// <param name="subscribe">Subscription object return by <see cref = "Subscribe"> </param>
/// <param name="tasRes"> Query retrieve result set. (Note there may be no record in the result set.)</param>
/// <param name="param"> Additional parameters supplied by the client when taos_subscribe is called.</param>
/// <param name="code"> Error code.</param>
public delegate void SubscribeCallback(IntPtr subscribe, IntPtr tasRes, IntPtr param, int code);
public delegate void StreamOpenCallback(IntPtr param, IntPtr taosRes, IntPtr taosRow);
public delegate void StreamOpenCallback2(IntPtr ptr);
public class TDengine
{
......@@ -176,12 +213,12 @@ namespace TDengineDriver
// static extern public IntPtr Query(IntPtr conn, string sqlstr);
static extern private IntPtr Query(IntPtr conn, IntPtr byteArr);
static public IntPtr Query(IntPtr conn,string command)
{
IntPtr res = IntPtr.Zero;
static public IntPtr Query(IntPtr conn, string command)
{
IntPtr res = IntPtr.Zero;
IntPtr commandBuffer = Marshal.StringToCoTaskMemUTF8(command);
res = Query(conn,commandBuffer);
res = Query(conn, commandBuffer);
return res;
}
......@@ -411,5 +448,146 @@ namespace TDengineDriver
[DllImport("taos", EntryPoint = "taos_fetch_lengths", CallingConvention = CallingConvention.Cdecl)]
static extern public IntPtr FetchLengths(IntPtr taos);
// Async Query
/// <summary>
/// This API uses non-blocking call mode.
/// Application can open mutilple tables and manipulate(query or insert) opened table concurrently.
/// So applications must ensure that opetations on the same table is compeletly serialized.
/// Becuase that will cause some query and insert operations cannot be performed.
/// </summary>
/// <param name="taos"> A taos connection return by Connect()</param>
/// <param name="sql">sql command need to execute</param>
/// <param name="fq">User-defined callback function. <see cref="QueryAsyncCallback"/></param>
/// <param name="param">the parameter for callback</param>
[DllImport("taos", EntryPoint = "taos_query_a", CallingConvention = CallingConvention.Cdecl)]
static extern public void QueryAsync(IntPtr taos, string sql, QueryAsyncCallback fq, IntPtr param);
/// <summary>
/// Get the result set of asynchronous queries in batch,
/// which can only be used with QueryAsync().<c>FetchRowAsyncCallback<c>
/// </summary>
/// <param name="taoRes"> The result set returned when backcall QueryAsyncCallback </param>
/// <param name="fq"> Callback function.<see cref="FetchRowAsyncCallback"/></param>
/// <param name="param"> The parameter for callback FetchRowAsyncCallback </param>
[DllImport("taos", EntryPoint = "taos_fetch_rows_a", CallingConvention = CallingConvention.Cdecl)]
static extern public void FetchRowAsync(IntPtr taoRes, FetchRowAsyncCallback fq, IntPtr param);
// Subscribe
/// <summary>
/// This function is used for start subscription service.
/// </summary>
/// <param name="taos"> taos connection return by <see cref = "Connect"></param>
/// <param name="restart">If the subscription is already exists, to decide whether to
/// start over or continue with previous subscription.</param>
/// <param name="topic"> The name of the subscription.(This is the unique identification of the subscription).</param>
/// <param name="sql">The subscribe statement(select only).Only query original data and in positive time sequence.</param>
/// <param name="fq">The callback function when the query result is received.</param>
/// <param name="param"> Additional parameter when calling callback function. System API will pass it to
/// callback function without any operations.It is only used when calling asynchronously,
/// and this parameter should be passed to NULL when calling synchronously</param>
/// <param name="interval">Polling period in milliseconds. During asynchronous call, the callback function will be
/// called periodically according to this parameter; In order to avoid affecting system
/// performance, it is not recommended to set this parameter too small; When calling synchronously,
/// if the interval between two calls to taos_consume is less than this period, the API will block
/// until the interval exceeds this period.</param>
/// <returns>Return null for failure, return subscribe object for success.</returns>
[DllImport("taos", EntryPoint = "taos_subscribe", CallingConvention = CallingConvention.Cdecl)]
static extern private IntPtr Subscribe(IntPtr taos, int restart, string topic, string sql, SubscribeCallback fq, IntPtr param, int interval);
static public IntPtr Subscribe(IntPtr taos, bool restart, string topic, string sql, SubscribeCallback fq, IntPtr param, int interval)
{
if (taos == IntPtr.Zero)
{
Console.WriteLine("taos connect is null,subscribe failed");
throw new Exception("taos connect is null");
}
else
{
IntPtr subPtr = Subscribe(taos, restart == true ? 1 : 0, topic, sql, fq, param, interval);
return subPtr;
}
}
/// <summary>
/// Only synchronous mode, this function is used to get the result of subscription.
/// If the interval between two calls to taos_consume is less than the polling
/// cycle of the subscription, the API will block until the interval exceeds this
/// cycle. If a new record arrives in the database, the API will return the latest
/// record, otherwise it will return an empty result set with no records.
/// If the return value is NULL, it indicates a system error.
/// </summary>
/// <param name="subscribe"> Subscription object return by <see cref = "Subscribe">. </param>
/// <returns></returns>
[DllImport("taos", EntryPoint = "taos_consume", CallingConvention = CallingConvention.Cdecl)]
static extern private IntPtr TaosConsume(IntPtr subscribe);
static public IntPtr Consume(IntPtr subscribe)
{
IntPtr res = IntPtr.Zero;
if (subscribe == IntPtr.Zero)
{
Console.WriteLine("Object subscribe is null,please subscribe first.");
throw new Exception("Object subscribe is null");
}
else
{
res = TaosConsume(subscribe);
}
return res;
}
/// <summary>
/// Unsubscribe.
/// </summary>
/// <param name="subscribe"> Subscription object return by <see cref = "Subscribe">.</param>
/// <param name="keep"> If it is not 0, the API will keep the progress of subscription,
/// and the and the subsequent call to taos_subscribe can continue
/// based on this progress; otherwise, the progress information will
/// be deleted and the data can only be read again.
/// </param>
[DllImport("taos", EntryPoint = "taos_unsubscribe", CallingConvention = CallingConvention.Cdecl)]
static extern private void Unsubscribe(IntPtr subscribe, int keep);
static public void Unsubscribe(IntPtr subscribe, bool keep)
{
if (subscribe == IntPtr.Zero)
{
Console.WriteLine("subscribe is null, close Unsubscribe failed");
throw new Exception("Object subscribe is null");
}
else
{
Unsubscribe(subscribe, keep == true ? 1 : 0);
Console.WriteLine("Unsubscribe success.");
}
}
// Stream
/// <summary>
/// Used to open an stream, which can do continuous query.
/// </summary>
/// <param name="taos"> taos connection return by <see cref = "Connect"></param>
/// <param name="sql"> Query statement( query only)</param>
/// <param name="fp"> User defined callback.</param>
/// <param name="stime"> The time when stream computing starts. If it is 0, it means starting from now.
/// If it is not zero, it means starting from the specified time (the number of
/// milliseconds from 1970/1/1 UTC time).
/// </param>
/// <param name="param">First parameter provide by application for callback usage.
/// While callback,this parameter is provided to the application.</param>
/// <param name="callback2">The second callback function which will be caled when the continuous query
/// stop automatically.</param>
/// <returns> Return null indicate creation failed, not null for success.</returns>
[DllImport("taos", EntryPoint = "taos_open_stream", CallingConvention = CallingConvention.Cdecl)]
static extern public IntPtr OpenStream(IntPtr taos, string sql, StreamOpenCallback fp, Int64 stime, IntPtr param, StreamOpenCallback2 callback2);
/// <summary>
/// Used too stop data flow.
/// Remember to stop data flow when you stopped steam computing.
/// </summary>
/// <param name="stream"> Value returned by <see cref = "OpenStream"></param>
[DllImport("taos", EntryPoint = "taos_close_stream", CallingConvention = CallingConvention.Cdecl)]
static extern public void CloseStream(IntPtr stream);
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>net5;netstandard2.0;net45</TargetFrameworks>
<TargetFrameworks>net5;netstandard2.1;</TargetFrameworks>
<PackageId>TDengine.Connector</PackageId>
<PackageIcon>logo.jpg</PackageIcon>
<Version>1.0.4</Version>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册