From 94c196869f1e595ffa67763ed74fbfd328ae4d0d Mon Sep 17 00:00:00 2001 From: xiaolei li <85657333+xleili@users.noreply.github.com> Date: Thu, 3 Mar 2022 13:38:56 +0800 Subject: [PATCH] Feature/xiaolei/td 13569 csharp support async query (#10488) * [TD-13569]:csharp async query * [TD-13569]:csharp support Async query * [TD-13569]:remove some useless method in example util.cs * [TD-13569]fix indent --- src/connector/C#/examples/Main.cs | 23 ++ src/connector/C#/examples/QueryAsyncSample.cs | 84 +++++ src/connector/C#/examples/examples.csproj | 11 + src/connector/C#/examples/lib/Utils.cs | 307 ++++++++++++++++++ .../C#/src/TDengineDriver/TDengineDriver.cs | 188 ++++++++++- .../src/TDengineDriver/TDengineDriver.csproj | 2 +- 6 files changed, 609 insertions(+), 6 deletions(-) create mode 100644 src/connector/C#/examples/Main.cs create mode 100644 src/connector/C#/examples/QueryAsyncSample.cs create mode 100644 src/connector/C#/examples/examples.csproj create mode 100644 src/connector/C#/examples/lib/Utils.cs diff --git a/src/connector/C#/examples/Main.cs b/src/connector/C#/examples/Main.cs new file mode 100644 index 0000000000..0f1db17766 --- /dev/null +++ b/src/connector/C#/examples/Main.cs @@ -0,0 +1,23 @@ +using System; +using Sample.UtilsTools; +using System.Runtime.InteropServices; +using TDengineDriver; +using Example; +using System.Collections.Generic; + +namespace AsyncQueryExample +{ + public class EntryPoint + { + static void Main(string[] args) + { + IntPtr conn = UtilsTools.TDConnection(); + + AsyncQuerySample asyncQuery = new AsyncQuerySample(); + asyncQuery.RunQueryAsync(conn,"query_async"); + + + UtilsTools.CloseConnection(conn); + } + } +} diff --git a/src/connector/C#/examples/QueryAsyncSample.cs b/src/connector/C#/examples/QueryAsyncSample.cs new file mode 100644 index 0000000000..cfd1f7a0c5 --- /dev/null +++ b/src/connector/C#/examples/QueryAsyncSample.cs @@ -0,0 +1,84 @@ +using System; +using TDengineDriver; +using Sample.UtilsTools; +using System.Runtime.InteropServices; +using System.Threading; + +namespace Example +{ + public class AsyncQuerySample + { + public void RunQueryAsync(IntPtr conn, string table) + { + QueryAsyncCallback queryAsyncCallback = new QueryAsyncCallback(QueryCallback); + PrepareData(conn, table); + Console.WriteLine($"Start calling QueryAsync(),query {table}'s data asynchronously."); + TDengine.QueryAsync(conn, $"select * from {table}", queryAsyncCallback, IntPtr.Zero); + Thread.Sleep(2000); + Console.WriteLine("QueryAsync done."); + + } + + //prepare the data(table and insert data) + public void PrepareData(IntPtr conn, string tableName) + { + string createTable = $"create table if not exists {tableName} (ts timestamp,i8 tinyint,i16 smallint,i32 int,i64 bigint)tags(t_bnry binary(50), t_nchr nchar(50));"; + string insert1 = $"insert into {tableName}_s01 using {tableName} tags('tag1','标签1') values(now,1,2,3,4)(now+1m,5,6,7,8)(now+2m,9,0,-1,-2)(now+3m,-3,-4,-5,-6)(now+4m,-7,-8,-9,0)"; + string insert2 = $"insert into {tableName}_s02 using {tableName} tags('tag2','标签2') values(now,1,2,3,4)(now+1m,5,6,7,8)(now+2m,9,0,-1,-2)(now+3m,-3,-4,-5,-6)(now+4m,-7,-8,-9,0)"; + string insert3 = $"insert into {tableName}_s03 using {tableName} tags('tag3','标签3') values(now,1,2,3,4)(now+1m,5,6,7,8)(now+2m,9,0,-1,-2)(now+3m,-3,-4,-5,-6)(now+4m,-7,-8,-9,0)"; + string insert4 = $"insert into {tableName}_s04 using {tableName} tags('tag4','标签4') values(now,1,2,3,4)(now+1m,5,6,7,8)(now+2m,9,0,-1,-2)(now+3m,-3,-4,-5,-6)(now+4m,-7,-8,-9,0)"; + string insert5 = $"insert into {tableName}_s05 using {tableName} tags('tag5','标签5') values(now,1,2,3,4)(now+1m,5,6,7,8)(now+2m,9,0,-1,-2)(now+3m,-3,-4,-5,-6)(now+4m,-7,-8,-9,0)"; + + UtilsTools.ExecuteUpdate(conn, createTable); + UtilsTools.ExecuteUpdate(conn, insert1); + Thread.Sleep(100); + UtilsTools.ExecuteUpdate(conn, insert2); + Thread.Sleep(100); + UtilsTools.ExecuteUpdate(conn, insert3); + Thread.Sleep(100); + UtilsTools.ExecuteUpdate(conn, insert4); + Thread.Sleep(100); + UtilsTools.ExecuteUpdate(conn, insert5); + + } + + public void QueryCallback(IntPtr param, IntPtr taosRes, int code) + { + if (code == 0 && taosRes != IntPtr.Zero) + { + FetchRowAsyncCallback fetchRowAsyncCallback = new FetchRowAsyncCallback(FetchCallback); + TDengine.FetchRowAsync(taosRes, fetchRowAsyncCallback, param); + } + else + { + Console.WriteLine($"async query data failed, failed code {code}"); + } + } + + // Iteratively call this interface until "numOfRows" is no greater than 0. + public void FetchCallback(IntPtr param, IntPtr taosRes, int numOfRows) + { + if (numOfRows > 0) + { + Console.WriteLine($"{numOfRows} rows async retrieved"); + UtilsTools.DisplayRes(taosRes); + TDengine.FetchRowAsync(taosRes, FetchCallback, param); + } + else + { + if (numOfRows == 0) + { + Console.WriteLine("async retrieve complete."); + + } + else + { + Console.WriteLine($"FetchRowAsync callback error, error code {numOfRows}"); + } + TDengine.FreeResult(taosRes); + } + } + } + + +} \ No newline at end of file diff --git a/src/connector/C#/examples/examples.csproj b/src/connector/C#/examples/examples.csproj new file mode 100644 index 0000000000..862df00f04 --- /dev/null +++ b/src/connector/C#/examples/examples.csproj @@ -0,0 +1,11 @@ + + + + + + + Exe + net5.0 + + + diff --git a/src/connector/C#/examples/lib/Utils.cs b/src/connector/C#/examples/lib/Utils.cs new file mode 100644 index 0000000000..b549406bee --- /dev/null +++ b/src/connector/C#/examples/lib/Utils.cs @@ -0,0 +1,307 @@ +using System; +using TDengineDriver; +using System.Runtime.InteropServices; +using System.Text; +using System.Collections.Generic; +namespace Sample.UtilsTools +{ + public class UtilsTools + { + + static string ip = "127.0.0.1"; + static string user = "root"; + static string password = "taosdata"; + static string db = ""; + static short port = 0; + static string globalDbName = "csharp_example_db"; + //get a tdengine connection + public static IntPtr TDConnection() + { + TDengine.Options((int)TDengineInitOption.TDDB_OPTION_CONFIGDIR, GetConfigPath()); + TDengine.Options((int)TDengineInitOption.TDDB_OPTION_SHELL_ACTIVITY_TIMER, "60"); + TDengine.Init(); + + IntPtr conn = TDengine.Connect(ip, user, password, db, port); + UtilsTools.ExecuteUpdate(conn, $"drop database if exists {globalDbName}"); + UtilsTools.ExecuteUpdate(conn, $"create database if not exists {globalDbName} keep 3650"); + UtilsTools.ExecuteUpdate(conn, $"use {globalDbName}"); + return conn; + } + //get taos.cfg file based on different os + public static string GetConfigPath() + { + string configDir = ""; + if (OperatingSystem.IsOSPlatform("Windows")) + { + configDir = "C:/TDengine/cfg"; + } + else if (OperatingSystem.IsOSPlatform("Linux")) + { + configDir = "/etc/taos"; + } + else if (OperatingSystem.IsOSPlatform("macOS")) + { + configDir = "/etc/taos"; + } + return configDir; + } + + public static IntPtr ExecuteQuery(IntPtr conn, String sql) + { + IntPtr res = TDengine.Query(conn, sql); + if (!IsValidResult(res)) + { + Console.Write(sql.ToString() + " failure, "); + ExitProgram(); + } + else + { + Console.WriteLine(sql.ToString() + " success"); + } + return res; + } + + public static IntPtr ExecuteErrorQuery(IntPtr conn, String sql) + { + IntPtr res = TDengine.Query(conn, sql); + if (!IsValidResult(res)) + { + Console.Write(sql.ToString() + " failure, "); + ExitProgram(); + } + else + { + Console.WriteLine(sql.ToString() + " success"); + + } + return res; + } + + public static void ExecuteUpdate(IntPtr conn, String sql) + { + IntPtr res = TDengine.Query(conn, sql); + if (!IsValidResult(res)) + { + Console.Write(sql.ToString() + " failure, "); + ExitProgram(); + } + else + { + Console.WriteLine(sql.ToString() + " success"); + + } + TDengine.FreeResult(res); + } + + public static void DisplayRes(IntPtr res) + { + if (!IsValidResult(res)) + { + ExitProgram(); + } + + List metas = GetResField(res); + int fieldCount = metas.Count; + // metas.ForEach((item) => { Console.Write("{0} ({1}) \t|\t", item.name, item.size); }); + + List datas = QueryRes(res, metas); + for (int index = 0; index < datas.Count; index++) + { + if (index % fieldCount == 0 && index != 0) + { + Console.WriteLine(""); + } + Console.Write("{0} \t|\t", datas[index].ToString()); + + } + Console.WriteLine(""); + } + + public static List> GetResultSet(IntPtr res) + { + List> result = new List>(); + List colName = new List(); + List dataRaw = new List(); + if (!IsValidResult(res)) + { + ExitProgram(); + } + + List metas = GetResField(res); + result.Add(colName); + + dataRaw = QueryRes(res, metas); + result.Add(dataRaw); + + if (TDengine.ErrorNo(res) != 0) + { + Console.Write("Query is not complete, Error {0:G}", TDengine.ErrorNo(res), TDengine.Error(res)); + } + return result; + } + + public static bool IsValidResult(IntPtr res) + { + if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0)) + { + if (res != IntPtr.Zero) + { + Console.Write("reason: " + TDengine.Error(res)); + return false; + } + Console.WriteLine(""); + return false; + } + return true; + } + public static void CloseConnection(IntPtr conn) + { + ExecuteUpdate(conn, $"drop database if exists {globalDbName}"); + if (conn != IntPtr.Zero) + { + if (TDengine.Close(conn) == 0) + { + Console.WriteLine("close connection sucess"); + } + else + { + Console.WriteLine("close Connection failed"); + } + } + } + public static List GetResField(IntPtr res) + { + List metas = TDengine.FetchFields(res); + return metas; + } + public static void ExitProgram() + { + TDengine.Cleanup(); + System.Environment.Exit(0); + } + public static List GetResData(IntPtr res) + { + List colName = new List(); + List dataRaw = new List(); + if (!IsValidResult(res)) + { + ExitProgram(); + } + List metas = GetResField(res); + dataRaw = QueryRes(res, metas); + return dataRaw; + } + + private static List QueryRes(IntPtr res, List metas) + { + IntPtr taosRow; + List dataRaw = new List(); + int fieldCount = metas.Count; + while ((taosRow = TDengine.FetchRows(res)) != IntPtr.Zero) + { + dataRaw.AddRange(FetchRow(taosRow,res)); + } + if (TDengine.ErrorNo(res) != 0) + { + Console.Write("Query is not complete, Error {0:G}", TDengine.ErrorNo(res), TDengine.Error(res)); + } + // TDengine.FreeResult(res); + Console.WriteLine(""); + return dataRaw; + } + + + public static List FetchRow(IntPtr taosRow, IntPtr taosRes)//, List metaList, int numOfFiled + { + List metaList = TDengine.FetchFields(taosRes); + int numOfFiled = TDengine.FieldCount(taosRes); + + List dataRaw = new List(); + + IntPtr colLengthPrt = TDengine.FetchLengths(taosRes); + int[] colLengthArr = new int[numOfFiled]; + Marshal.Copy(colLengthPrt, colLengthArr, 0, numOfFiled); + + for (int i = 0; i < numOfFiled; i++) + { + TDengineMeta meta = metaList[i]; + IntPtr data = Marshal.ReadIntPtr(taosRow, IntPtr.Size * i); + + if (data == IntPtr.Zero) + { + dataRaw.Add("NULL"); + continue; + } + switch ((TDengineDataType)meta.type) + { + case TDengineDataType.TSDB_DATA_TYPE_BOOL: + bool v1 = Marshal.ReadByte(data) == 0 ? false : true; + dataRaw.Add(v1); + break; + case TDengineDataType.TSDB_DATA_TYPE_TINYINT: + sbyte v2 = (sbyte)Marshal.ReadByte(data); + dataRaw.Add(v2); + break; + case TDengineDataType.TSDB_DATA_TYPE_SMALLINT: + short v3 = Marshal.ReadInt16(data); + dataRaw.Add(v3); + break; + case TDengineDataType.TSDB_DATA_TYPE_INT: + int v4 = Marshal.ReadInt32(data); + dataRaw.Add(v4); + break; + case TDengineDataType.TSDB_DATA_TYPE_BIGINT: + long v5 = Marshal.ReadInt64(data); + dataRaw.Add(v5); + break; + case TDengineDataType.TSDB_DATA_TYPE_FLOAT: + float v6 = (float)Marshal.PtrToStructure(data, typeof(float)); + dataRaw.Add(v6); + break; + case TDengineDataType.TSDB_DATA_TYPE_DOUBLE: + double v7 = (double)Marshal.PtrToStructure(data, typeof(double)); + dataRaw.Add(v7); + break; + case TDengineDataType.TSDB_DATA_TYPE_BINARY: + string v8 = Marshal.PtrToStringUTF8(data, colLengthArr[i]); + dataRaw.Add(v8); + break; + case TDengineDataType.TSDB_DATA_TYPE_TIMESTAMP: + long v9 = Marshal.ReadInt64(data); + dataRaw.Add(v9); + break; + case TDengineDataType.TSDB_DATA_TYPE_NCHAR: + string v10 = Marshal.PtrToStringUTF8(data, colLengthArr[i]); + dataRaw.Add(v10); + break; + case TDengineDataType.TSDB_DATA_TYPE_UTINYINT: + byte v12 = Marshal.ReadByte(data); + dataRaw.Add(v12.ToString()); + break; + case TDengineDataType.TSDB_DATA_TYPE_USMALLINT: + ushort v13 = (ushort)Marshal.ReadInt16(data); + dataRaw.Add(v13); + break; + case TDengineDataType.TSDB_DATA_TYPE_UINT: + uint v14 = (uint)Marshal.ReadInt32(data); + dataRaw.Add(v14); + break; + case TDengineDataType.TSDB_DATA_TYPE_UBIGINT: + ulong v15 = (ulong)Marshal.ReadInt64(data); + dataRaw.Add(v15); + break; + case TDengineDataType.TSDB_DATA_TYPE_JSONTAG: + string v16 = Marshal.PtrToStringUTF8(data, colLengthArr[i]); + dataRaw.Add(v16); + break; + default: + dataRaw.Add("nonsupport data type value"); + break; + } + + } + return dataRaw; + } + } +} + diff --git a/src/connector/C#/src/TDengineDriver/TDengineDriver.cs b/src/connector/C#/src/TDengineDriver/TDengineDriver.cs index c77e4a1488..d3bae49674 100644 --- a/src/connector/C#/src/TDengineDriver/TDengineDriver.cs +++ b/src/connector/C#/src/TDengineDriver/TDengineDriver.cs @@ -144,6 +144,43 @@ namespace TDengineDriver public int num; } + /// + /// User defined callback function for interface "QueryAsync()" + /// ,actually is a delegate in .Net. + /// This function aim to handel the taoRes which points to + /// the caller method's sql resultset. + /// + /// This parameter will sent by caller method (QueryAsync()). + /// This is the retrieved by caller method's sql. + /// 0 for indicate operation success and negative for operation fail. + public delegate void QueryAsyncCallback(IntPtr param, IntPtr taoRes, int code); + + /// + /// User defined callback function for interface "FetchRowAsync()" + /// ,actually is a delegate in .Net. + /// This callback allow applications to get each row of the + /// batch records by calling FetchRowAsync() forward iteration. + /// After reading all the records in a block, the application needs to continue calling + /// FetchRowAsync() in this callback function to obtain the next batch of records for + /// processing until the number of records + /// + /// The parameter passed by + /// Query status + /// The number of rows of data obtained (not a function of + /// the entire query result set). When the number is zero (the result is returned) + /// or the number of records is negative (the query fails). + public delegate void FetchRowAsyncCallback(IntPtr param, IntPtr taoRes, int numOfRows); + + /// + /// In asynchronous mode, the prototype of the callback function. + /// + /// Subscription object return by + /// Query retrieve result set. (Note there may be no record in the result set.) + /// Additional parameters supplied by the client when taos_subscribe is called. + /// Error code. + public delegate void SubscribeCallback(IntPtr subscribe, IntPtr tasRes, IntPtr param, int code); + public delegate void StreamOpenCallback(IntPtr param, IntPtr taosRes, IntPtr taosRow); + public delegate void StreamOpenCallback2(IntPtr ptr); public class TDengine { @@ -176,12 +213,12 @@ namespace TDengineDriver // static extern public IntPtr Query(IntPtr conn, string sqlstr); static extern private IntPtr Query(IntPtr conn, IntPtr byteArr); - static public IntPtr Query(IntPtr conn,string command) - { - IntPtr res = IntPtr.Zero; - + static public IntPtr Query(IntPtr conn, string command) + { + IntPtr res = IntPtr.Zero; + IntPtr commandBuffer = Marshal.StringToCoTaskMemUTF8(command); - res = Query(conn,commandBuffer); + res = Query(conn, commandBuffer); return res; } @@ -411,5 +448,146 @@ namespace TDengineDriver [DllImport("taos", EntryPoint = "taos_fetch_lengths", CallingConvention = CallingConvention.Cdecl)] static extern public IntPtr FetchLengths(IntPtr taos); + + // Async Query + /// + /// This API uses non-blocking call mode. + /// Application can open mutilple tables and manipulate(query or insert) opened table concurrently. + /// So applications must ensure that opetations on the same table is compeletly serialized. + /// Becuase that will cause some query and insert operations cannot be performed. + /// + /// A taos connection return by Connect() + /// sql command need to execute + /// User-defined callback function. + /// the parameter for callback + [DllImport("taos", EntryPoint = "taos_query_a", CallingConvention = CallingConvention.Cdecl)] + static extern public void QueryAsync(IntPtr taos, string sql, QueryAsyncCallback fq, IntPtr param); + + /// + /// Get the result set of asynchronous queries in batch, + /// which can only be used with QueryAsync().FetchRowAsyncCallback + /// + /// The result set returned when backcall QueryAsyncCallback + /// Callback function. + /// The parameter for callback FetchRowAsyncCallback + [DllImport("taos", EntryPoint = "taos_fetch_rows_a", CallingConvention = CallingConvention.Cdecl)] + static extern public void FetchRowAsync(IntPtr taoRes, FetchRowAsyncCallback fq, IntPtr param); + + // Subscribe + + /// + /// This function is used for start subscription service. + /// + /// taos connection return by + /// If the subscription is already exists, to decide whether to + /// start over or continue with previous subscription. + /// The name of the subscription.(This is the unique identification of the subscription). + /// The subscribe statement(select only).Only query original data and in positive time sequence. + /// The callback function when the query result is received. + /// Additional parameter when calling callback function. System API will pass it to + /// callback function without any operations.It is only used when calling asynchronously, + /// and this parameter should be passed to NULL when calling synchronously + /// Polling period in milliseconds. During asynchronous call, the callback function will be + /// called periodically according to this parameter; In order to avoid affecting system + /// performance, it is not recommended to set this parameter too small; When calling synchronously, + /// if the interval between two calls to taos_consume is less than this period, the API will block + /// until the interval exceeds this period. + /// Return null for failure, return subscribe object for success. + [DllImport("taos", EntryPoint = "taos_subscribe", CallingConvention = CallingConvention.Cdecl)] + static extern private IntPtr Subscribe(IntPtr taos, int restart, string topic, string sql, SubscribeCallback fq, IntPtr param, int interval); + static public IntPtr Subscribe(IntPtr taos, bool restart, string topic, string sql, SubscribeCallback fq, IntPtr param, int interval) + { + if (taos == IntPtr.Zero) + { + Console.WriteLine("taos connect is null,subscribe failed"); + throw new Exception("taos connect is null"); + } + else + { + IntPtr subPtr = Subscribe(taos, restart == true ? 1 : 0, topic, sql, fq, param, interval); + return subPtr; + } + } + + /// + /// Only synchronous mode, this function is used to get the result of subscription. + /// If the interval between two calls to taos_consume is less than the polling + /// cycle of the subscription, the API will block until the interval exceeds this + /// cycle. If a new record arrives in the database, the API will return the latest + /// record, otherwise it will return an empty result set with no records. + /// If the return value is NULL, it indicates a system error. + /// + /// Subscription object return by . + /// + [DllImport("taos", EntryPoint = "taos_consume", CallingConvention = CallingConvention.Cdecl)] + static extern private IntPtr TaosConsume(IntPtr subscribe); + static public IntPtr Consume(IntPtr subscribe) + { + IntPtr res = IntPtr.Zero; + if (subscribe == IntPtr.Zero) + { + Console.WriteLine("Object subscribe is null,please subscribe first."); + throw new Exception("Object subscribe is null"); + } + else + { + res = TaosConsume(subscribe); + } + return res; + } + + /// + /// Unsubscribe. + /// + /// Subscription object return by . + /// If it is not 0, the API will keep the progress of subscription, + /// and the and the subsequent call to taos_subscribe can continue + /// based on this progress; otherwise, the progress information will + /// be deleted and the data can only be read again. + /// + [DllImport("taos", EntryPoint = "taos_unsubscribe", CallingConvention = CallingConvention.Cdecl)] + static extern private void Unsubscribe(IntPtr subscribe, int keep); + static public void Unsubscribe(IntPtr subscribe, bool keep) + { + if (subscribe == IntPtr.Zero) + { + Console.WriteLine("subscribe is null, close Unsubscribe failed"); + throw new Exception("Object subscribe is null"); + } + else + { + Unsubscribe(subscribe, keep == true ? 1 : 0); + Console.WriteLine("Unsubscribe success."); + } + + } + // Stream + + /// + /// Used to open an stream, which can do continuous query. + /// + /// taos connection return by + /// Query statement( query only) + /// User defined callback. + /// The time when stream computing starts. If it is 0, it means starting from now. + /// If it is not zero, it means starting from the specified time (the number of + /// milliseconds from 1970/1/1 UTC time). + /// + /// First parameter provide by application for callback usage. + /// While callback,this parameter is provided to the application. + /// The second callback function which will be caled when the continuous query + /// stop automatically. + /// Return null indicate creation failed, not null for success. + [DllImport("taos", EntryPoint = "taos_open_stream", CallingConvention = CallingConvention.Cdecl)] + static extern public IntPtr OpenStream(IntPtr taos, string sql, StreamOpenCallback fp, Int64 stime, IntPtr param, StreamOpenCallback2 callback2); + + /// + /// Used too stop data flow. + /// Remember to stop data flow when you stopped steam computing. + /// + /// Value returned by + [DllImport("taos", EntryPoint = "taos_close_stream", CallingConvention = CallingConvention.Cdecl)] + static extern public void CloseStream(IntPtr stream); + } } diff --git a/src/connector/C#/src/TDengineDriver/TDengineDriver.csproj b/src/connector/C#/src/TDengineDriver/TDengineDriver.csproj index 5a11c10208..4a030046a1 100644 --- a/src/connector/C#/src/TDengineDriver/TDengineDriver.csproj +++ b/src/connector/C#/src/TDengineDriver/TDengineDriver.csproj @@ -1,7 +1,7 @@ - net5;netstandard2.0;net45 + net5;netstandard2.1; TDengine.Connector logo.jpg 1.0.4 -- GitLab