diff --git a/src/connector/C#/src/test/FunctionTest/Subscribe.cs b/src/connector/C#/src/test/FunctionTest/Subscribe.cs new file mode 100644 index 0000000000000000000000000000000000000000..57ded4a82f530b759b014f8109cd63246f2dfc3b --- /dev/null +++ b/src/connector/C#/src/test/FunctionTest/Subscribe.cs @@ -0,0 +1,184 @@ +using TDengineDriver; +using Test.UtilsTools; +using System; +using System.Collections.Generic; +using Xunit; +using Test.UtilsTools.DataSource; +using System.Threading; +using Xunit.Abstractions; +using Test.Fixture; +using Test.Case.Attributes; + +namespace Cases +{ + [TestCaseOrderer("XUnit.Case.Orderers.TestExeOrderer", "Cases.ExeOrder")] + [Collection("Database collection")] + + public class SubscribeCases + { + DatabaseFixture database; + + private readonly ITestOutputHelper output; + + public SubscribeCases(DatabaseFixture fixture, ITestOutputHelper output) + { + this.database = fixture; + this.output = output; + } + /// xiaolei + /// SubscribeCases.ConsumeFromBegin + /// Subscribe a table and consume from beginning. + /// Subscribe.cs + /// pass or failed + [Fact(DisplayName = "SubscribeCases.ConsumeFromBegin()"), TestExeOrder(1), Trait("Category", "Without callback")] + public void ConsumeFromBegin() + { + IntPtr conn = database.conn; + IntPtr _res = IntPtr.Zero; + + var tableName = "subscribe_from_begin"; + var createSql = $"create table if not exists {tableName}(ts timestamp,bl bool,i8 tinyint,i16 smallint,i32 int,i64 bigint,bnr binary(50),nchr nchar(50))tags(t_i32 int,t_bnr binary(50),t_nchr nchar(50))"; + var dropSql = $"drop table if exists {tableName}"; + + var colData = new List{1646150410100,true,1,11,1111,11111111,"value one","值壹", + 1646150410200,true,2,22,2222,22222222,"value two","值贰", + 1646150410300,false,3,33,3333,33333333,"value three","值三", + }; + + var colData2 = new List{1646150410400,false,4,44,4444,44444444,"value three","值肆", + 1646150410500,true,5,55,5555,55555555,"value one","值伍", + 1646150410600,true,6,66,6666,66666666,"value two","值陆", + }; + + var tagData = new List { 1, "tag_one", "标签壹" }; + var tagData2 = new List { 2, "tag_two", "标签贰" }; + + String insertSql = UtilsTools.ConstructInsertSql(tableName + "_s01", tableName, colData, tagData, 3); + String insertSql2 = UtilsTools.ConstructInsertSql(tableName + "_s02", tableName, colData2, tagData2, 3); + // Then + List expectResMeta = DataSource.GetMetaFromDDL(createSql); + List expectResData = UtilsTools.CombineColAndTagData(colData, tagData, 3); + List expectResData2 = UtilsTools.CombineColAndTagData(colData2, tagData2, 3); + expectResData.AddRange(expectResData2); + + var querySql = $"select * from {tableName}"; + UtilsTools.ExecuteUpdate(conn, dropSql); + UtilsTools.ExecuteUpdate(conn, createSql); + UtilsTools.ExecuteUpdate(conn, insertSql); + + + IntPtr subscribe = TDengine.Subscribe(conn, true, tableName, querySql, null, IntPtr.Zero, 0); + _res = TDengine.Consume(subscribe); + // need to call fetch TAOS_RES + UtilsTools.GetResDataWithoutFree(_res); + TDengine.Unsubscribe(subscribe, true); + + UtilsTools.ExecuteUpdate(conn, insertSql2); + Thread.Sleep(100); + + + subscribe = TDengine.Subscribe(conn, true, tableName, querySql, null, IntPtr.Zero, 0); + _res = TDengine.Consume(subscribe); + + List actualMeta = UtilsTools.GetResField(_res); + List actualResData = UtilsTools.GetResDataWithoutFree(_res); + TDengine.Unsubscribe(subscribe, false); + + Assert.Equal(expectResData.Count, actualResData.Count); + output.WriteLine("Assert Meta data"); + //Assert Meta data + for (int i = 0; i < actualMeta.Count; i++) + { + Assert.Equal(expectResMeta[i].name, actualMeta[i].name); + Assert.Equal(expectResMeta[i].type, actualMeta[i].type); + Assert.Equal(expectResMeta[i].size, actualMeta[i].size); + } + output.WriteLine("Assert retrieve data"); + // Assert retrieve data + for (int i = 0; i < actualResData.Count; i++) + { + // output.WriteLine("{0},{1},{2}", i, expectResData[i], actualResData[i]); + Assert.Equal(expectResData[i].ToString(), actualResData[i]); + } + + } + + /// xiaolei + /// SubscribeCases.ConsumeFromLastProgress + /// Subscribe table from the last progress. + /// Subscribe.cs + /// pass or failed + [Fact(DisplayName = "SubscribeCases.ConsumeFromLastProgress()"), TestExeOrder(2), Trait("Category", "Without callback")] + public void ConsumeFromLastProgress() + { + IntPtr conn = database.conn; + IntPtr _res = IntPtr.Zero; + + var tableName = "subscribe_from_progress"; + var createSql = $"create table if not exists {tableName}(ts timestamp,bl bool,i8 tinyint,i16 smallint,i32 int,i64 bigint,bnr binary(50),nchr nchar(50))tags(t_i32 int,t_bnr binary(50),t_nchr nchar(50))"; + var dropSql = $"drop table if exists {tableName}"; + + var colData = new List{1646150410100,true,1,11,1111,11111111,"value one","值壹", + 1646150410200,true,2,22,2222,22222222,"value two","值贰", + 1646150410300,false,3,33,3333,33333333,"value three","值叁", + }; + + var colData2 = new List{1646150410400,false,4,44,4444,44444444,"value three","值肆", + 1646150410500,true,5,55,5555,55555555,"value one","值伍", + 1646150410600,true,6,66,6666,66666666,"value two","值陆", + }; + + var tagData = new List { 1, "tag_one", "标签壹" }; + var tagData2 = new List { 2, "tag_two", "标签贰" }; + + String insertSql = UtilsTools.ConstructInsertSql(tableName + "_s01", tableName, colData, tagData, 3); + String insertSql2 = UtilsTools.ConstructInsertSql(tableName + "_s02", tableName, colData2, tagData2, 3); + // Then + List expectResMeta = DataSource.GetMetaFromDDL(createSql); + List expectResData = UtilsTools.CombineColAndTagData(colData, tagData, 3); + List expectResData2 = UtilsTools.CombineColAndTagData(colData2, tagData2, 3); + + + var querySql = $"select * from {tableName}"; + UtilsTools.ExecuteUpdate(conn, dropSql); + UtilsTools.ExecuteUpdate(conn, createSql); + UtilsTools.ExecuteUpdate(conn, insertSql); + + // First time subscribe + IntPtr subscribe = TDengine.Subscribe(conn, true, tableName, querySql, null, IntPtr.Zero, 20); + _res = TDengine.Consume(subscribe); + // need to call fetch TAOS_RES + UtilsTools.GetResDataWithoutFree(_res); + // Close subscribe and save progress. + TDengine.Unsubscribe(subscribe, true); + + // Insert new data. + UtilsTools.ExecuteUpdate(conn, insertSql2); + Thread.Sleep(1000); + + subscribe = TDengine.Subscribe(conn, false, tableName, querySql, null, IntPtr.Zero, 20); + _res = TDengine.Consume(subscribe); + + List actualMeta = UtilsTools.GetResField(_res); + List actualResData = UtilsTools.GetResDataWithoutFree(_res); + TDengine.Unsubscribe(subscribe, true); + output.WriteLine("Assert Meta data"); + //Assert Meta data + for (int i = 0; i < actualMeta.Count; i++) + { + Assert.Equal(expectResMeta[i].name, actualMeta[i].name); + Assert.Equal(expectResMeta[i].type, actualMeta[i].type); + Assert.Equal(expectResMeta[i].size, actualMeta[i].size); + } + output.WriteLine("Assert retrieve data"); + // Assert retrieve data + for (int i = 0; i < actualResData.Count; i++) + { + // output.WriteLine("{0},{1},{2}", i, expectResData[i], actualResData[i]); + Assert.Equal(expectResData2[i].ToString(), actualResData[i]); + } + + } + } + +} \ No newline at end of file diff --git a/src/connector/C#/src/test/FunctionTest/SubscribeAsync.cs b/src/connector/C#/src/test/FunctionTest/SubscribeAsync.cs new file mode 100644 index 0000000000000000000000000000000000000000..6a954ef0e048331fdf9809e0030240cd80fa1d6f --- /dev/null +++ b/src/connector/C#/src/test/FunctionTest/SubscribeAsync.cs @@ -0,0 +1,237 @@ +using TDengineDriver; +using Test.UtilsTools; +using System; +using System.Collections.Generic; +using Xunit; +using Test.UtilsTools.DataSource; +using System.Threading; +using Xunit.Abstractions; +using Test.Fixture; +using Test.Case.Attributes; + +namespace Cases +{ + [TestCaseOrderer("XUnit.Case.Orderers.TestExeOrderer", "Cases.ExeOrder")] + [Collection("Database collection")] + public class SubscribeAsyncCases + { + DatabaseFixture database; + + private readonly ITestOutputHelper output; + + public SubscribeAsyncCases(DatabaseFixture fixture, ITestOutputHelper output) + { + this.database = fixture; + this.output = output; + } + + /// xiaolei + /// SubscribeAsyncCases.ConsumeFromBegin + /// Subscribe a table and consume through callback and the beginning record of the table + /// Subscribe.cs + /// pass or failed + [Fact(DisplayName = "SubscribeAsyncCases.ConsumeFromBegin()"), TestExeOrder(1), Trait("Category", "With callback")] + public void ConsumeFromBegin() + { + IntPtr conn = database.conn; + IntPtr _res = IntPtr.Zero; + + var tableName = "subscribe_async_from_begin"; + var createSql = $"create table if not exists {tableName}(ts timestamp,bl bool,i8 tinyint,i16 smallint,i32 int,i64 bigint,bnr binary(50),nchr nchar(50))tags(t_i32 int,t_bnr binary(50),t_nchr nchar(50))"; + var dropSql = $"drop table if exists {tableName}"; + + var colData = new List{1646150410100,true,1,11,1111,11111111,"value one","值壹", + 1646150410200,true,2,22,2222,22222222,"value two","值贰", + 1646150410300,false,3,33,3333,33333333,"value three","值三", + }; + + var colData2 = new List{1646150410400,false,4,44,4444,44444444,"value three","值肆", + 1646150410500,true,5,55,5555,55555555,"value one","值伍", + 1646150410600,true,6,66,6666,66666666,"value two","值陆", + }; + + var tagData = new List { 1, "tag_one", "标签壹" }; + var tagData2 = new List { 2, "tag_two", "标签贰" }; + + String insertSql = UtilsTools.ConstructInsertSql(tableName + "_s01", tableName, colData, tagData, 3); + String insertSql2 = UtilsTools.ConstructInsertSql(tableName + "_s02", tableName, colData2, tagData2, 3); + List expectResMeta = DataSource.GetMetaFromDDL(createSql); + List expectResData = UtilsTools.CombineColAndTagData(colData, tagData, 3); + List expectResData2 = UtilsTools.CombineColAndTagData(colData2, tagData2, 3); + expectResData.AddRange(expectResData2); + var querySql = $"select * from {tableName}"; + + UtilsTools.ExecuteUpdate(conn, dropSql); + UtilsTools.ExecuteUpdate(conn, createSql); + UtilsTools.ExecuteUpdate(conn, insertSql); + + SubscribeCallback subscribeCallback1 = new SubscribeCallback(SubCallback1); + SubscribeCallback subscribeCallback2 = new SubscribeCallback(SubCallback2); + IntPtr subscribe = TDengine.Subscribe(conn, true, tableName, querySql, subscribeCallback1, IntPtr.Zero, 200); + + UtilsTools.ExecuteUpdate(conn, insertSql2); + Thread.Sleep(1000); + TDengine.Unsubscribe(subscribe, true); + + subscribe = TDengine.Subscribe(conn, true, tableName, querySql, subscribeCallback2, IntPtr.Zero, 200); + Thread.Sleep(1000); + TDengine.Unsubscribe(subscribe, false); + void SubCallback1(IntPtr subscribe, IntPtr taosRes, IntPtr param, int code) + { + if (code == 0 && taosRes != IntPtr.Zero) + { + // cannot free taosRes using taosRes, otherwise will cause crash. + UtilsTools.GetResDataWithoutFree(taosRes); + } + else + { + output.WriteLine($"async query data failed, failed code:{code}, reason:{TDengine.Error(taosRes)}"); + } + + } + + void SubCallback2(IntPtr subscribe, IntPtr taosRes, IntPtr param, int code) + { + if (code == 0 && taosRes != IntPtr.Zero) + { + List actualMeta = UtilsTools.GetResField(taosRes); + List actualResData = UtilsTools.GetResDataWithoutFree(taosRes); + // UtilsTools.DisplayRes(taosRes); + if (actualResData.Count == 0) + { + output.WriteLine($"consume in subscribe callback without data"); + } + else + { + output.WriteLine($"consume in subscribe callback with data"); + + Assert.Equal(expectResData.Count, actualResData.Count); + output.WriteLine("Assert Meta data"); + //Assert Meta data + for (int i = 0; i < actualMeta.Count; i++) + { + Assert.Equal(expectResMeta[i].name, actualMeta[i].name); + Assert.Equal(expectResMeta[i].type, actualMeta[i].type); + Assert.Equal(expectResMeta[i].size, actualMeta[i].size); + } + output.WriteLine("Assert retrieve data"); + // Assert retrieve data + for (int i = 0; i < actualResData.Count; i++) + { + // output.WriteLine("index:{0},expectResData:{1},actualResData:{2}", i, expectResData[i], actualResData[i]); + Assert.Equal(expectResData[i].ToString(), actualResData[i]); + } + } + } + else + { + output.WriteLine($"async query data failed, failed code:{code}, reason:{TDengine.Error(taosRes)}"); + } + } + + } + + /// xiaolei + /// SubscribeAsyncCases.ConsumeFromLastProgress + /// Subscribe a table and consume through callback and from last consume progress. + /// Subscribe.cs + /// pass or failed + [Fact(DisplayName = "SubscribeAsyncCases.ConsumeFromLastProgress()"), TestExeOrder(2), Trait("Category", "With callback")] + public void ConsumeFromLastProgress() + { + IntPtr conn = database.conn; + IntPtr _res = IntPtr.Zero; + + var tableName = "subscribe_async_from_begin"; + var createSql = $"create table if not exists {tableName}(ts timestamp,bl bool,i8 tinyint,i16 smallint,i32 int,i64 bigint,bnr binary(50),nchr nchar(50))tags(t_i32 int,t_bnr binary(50),t_nchr nchar(50))"; + var dropSql = $"drop table if exists {tableName}"; + + var colData = new List{1646150410100,true,1,11,1111,11111111,"value one","值壹", + 1646150410200,true,2,22,2222,22222222,"value two","值贰", + 1646150410300,false,3,33,3333,33333333,"value three","值三", + }; + + var colData2 = new List{1646150410400,false,4,44,4444,44444444,"value three","值肆", + 1646150410500,true,5,55,5555,55555555,"value one","值伍", + 1646150410600,true,6,66,6666,66666666,"value two","值陆", + }; + + var tagData = new List { 1, "tag_one", "标签壹" }; + var tagData2 = new List { 2, "tag_two", "标签贰" }; + + String insertSql = UtilsTools.ConstructInsertSql(tableName + "_s01", tableName, colData, tagData, 3); + String insertSql2 = UtilsTools.ConstructInsertSql(tableName + "_s02", tableName, colData2, tagData2, 3); + List expectResMeta = DataSource.GetMetaFromDDL(createSql); + List expectResData = UtilsTools.CombineColAndTagData(colData, tagData, 3); + List expectResData2 = UtilsTools.CombineColAndTagData(colData2, tagData2, 3); + var querySql = $"select * from {tableName}"; + + UtilsTools.ExecuteUpdate(conn, dropSql); + UtilsTools.ExecuteUpdate(conn, createSql); + UtilsTools.ExecuteUpdate(conn, insertSql); + + SubscribeCallback subscribeCallback1 = new SubscribeCallback(SubCallback1); + SubscribeCallback subscribeCallback2 = new SubscribeCallback(SubCallback2); + IntPtr subscribe = TDengine.Subscribe(conn, true, tableName, querySql, subscribeCallback1, IntPtr.Zero, 200); + Thread.Sleep(1000); + TDengine.Unsubscribe(subscribe, true); + UtilsTools.ExecuteUpdate(conn, insertSql2); + subscribe = TDengine.Subscribe(conn, false, tableName, querySql, subscribeCallback2, IntPtr.Zero, 200); + Thread.Sleep(1000); + TDengine.Unsubscribe(subscribe, false); + void SubCallback1(IntPtr subscribe, IntPtr taosRes, IntPtr param, int code) + { + if (code == 0 && taosRes != IntPtr.Zero) + { + // cannot free taosRes using taosRes, otherwise will cause crash. + UtilsTools.GetResDataWithoutFree(taosRes); + } + else if (taosRes != IntPtr.Zero) + { + output.WriteLine($"async query data failed, failed code:{code}, reason:{TDengine.Error(taosRes)}"); + } + + } + + void SubCallback2(IntPtr subscribe, IntPtr taosRes, IntPtr param, int code) + { + if (code == 0 && taosRes != IntPtr.Zero) + { + List actualMeta = UtilsTools.GetResField(taosRes); + List actualResData = UtilsTools.GetResDataWithoutFree(taosRes); + UtilsTools.DisplayRes(taosRes); + if (actualResData.Count == 0) + { + output.WriteLine($"consume in subscribe callback without data"); + } + else + { + output.WriteLine($"consume in subscribe callback with data"); + + Assert.Equal(expectResData2.Count, actualResData.Count); + output.WriteLine("Assert Meta data"); + //Assert Meta data + for (int i = 0; i < actualMeta.Count; i++) + { + Assert.Equal(expectResMeta[i].name, actualMeta[i].name); + Assert.Equal(expectResMeta[i].type, actualMeta[i].type); + Assert.Equal(expectResMeta[i].size, actualMeta[i].size); + } + output.WriteLine("Assert retrieve data"); + // Assert retrieve data + for (int i = 0; i < actualResData.Count; i++) + { + // output.WriteLine("index:{0},expectResData:{1},actualResData:{2}", i, expectResData[i], actualResData[i]); + Assert.Equal(expectResData2[i].ToString(), actualResData[i]); + } + } + } + else + { + output.WriteLine($"async query data failed, failed code:{code}, reason:{TDengine.Error(taosRes)}"); + } + } + + } + } +} \ No newline at end of file diff --git a/src/connector/C#/src/test/FunctionTest/lib/DBFixture.cs b/src/connector/C#/src/test/FunctionTest/lib/DBFixture.cs index 83492536fe7d3ce9eb012282db2cd4979b6b03f0..295726a525fb7a6c7740d5216fa19ee797109eb1 100644 --- a/src/connector/C#/src/test/FunctionTest/lib/DBFixture.cs +++ b/src/connector/C#/src/test/FunctionTest/lib/DBFixture.cs @@ -48,30 +48,30 @@ namespace Test.Fixture public void Dispose() { - // IntPtr res; - // if (conn != IntPtr.Zero) - // { - // if ((res = TDengine.Query(conn, $"drop database if exists {db}")) != IntPtr.Zero) - // { - // if (TDengine.Close(conn) == 0) - // { - // Console.WriteLine("close connection success"); - // } - // else - // { - // throw new Exception("close connection failed"); - // } + IntPtr res; + if (conn != IntPtr.Zero) + { + if ((res = TDengine.Query(conn, $"drop database if exists {db}")) != IntPtr.Zero) + { + if (TDengine.Close(conn) == 0) + { + Console.WriteLine("close connection success"); + } + else + { + throw new Exception("close connection failed"); + } - // } - // else - // { - // throw new Exception(TDengine.Error(res)); - // } - // } - // else - // { - // throw new Exception("connection if already null"); - // } + } + else + { + throw new Exception(TDengine.Error(res)); + } + } + else + { + throw new Exception("connection if already null"); + } } diff --git a/src/connector/C#/src/test/FunctionTest/lib/Utils.cs b/src/connector/C#/src/test/FunctionTest/lib/Utils.cs index 93da6b77cf8132c3471bea3f673f6ebee96a3679..9604a0b043fc7b99ee4e58208ba1ff5212453fc5 100644 --- a/src/connector/C#/src/test/FunctionTest/lib/Utils.cs +++ b/src/connector/C#/src/test/FunctionTest/lib/Utils.cs @@ -104,11 +104,12 @@ namespace Test.UtilsTools IntPtr rowdata; List dataList = QueryRes(res, metaList); - for (int i = 0; i < metaList.Count; i++) + for (int i = 0; i < dataList.Count; i += metaList.Count) { - for (int j = 0; j < dataList.Count; j++) + for (int j = 0; j < metaList.Count; j++) { - Console.Write(" {0} \t|", dataList[j]); + Console.Write(" {0} \t|", dataList[i + j]); + } Console.WriteLine(""); }