From 02d87f7c9b5a8f99b5a17d2a4d4d0ce8589e4816 Mon Sep 17 00:00:00 2001 From: MysticBoy Date: Mon, 2 Nov 2020 20:16:45 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E4=BA=86=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E5=8E=86=E5=8F=B2=20=E5=92=8C=20=E6=9F=A5=E8=AF=A2=20=20?= =?UTF-8?q?=E6=9C=80=E6=96=B0=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- IoTSharp/Storage/InfluxDBStorage.cs | 89 +++++++++++++++++---------- IoTSharp/Storage/InfluxDBV1Storage.cs | 13 +--- 2 files changed, 58 insertions(+), 44 deletions(-) diff --git a/IoTSharp/Storage/InfluxDBStorage.cs b/IoTSharp/Storage/InfluxDBStorage.cs index 27bcddc8..73d5fbf1 100644 --- a/IoTSharp/Storage/InfluxDBStorage.cs +++ b/IoTSharp/Storage/InfluxDBStorage.cs @@ -1,6 +1,7 @@ using hyjiacan.py4n; using InfluxDB.Client; using InfluxDB.Client.Api.Domain; +using InfluxDB.Client.Core.Flux.Domain; using InfluxDB.Client.Writes; using IoTSharp.Data; using IoTSharp.Dtos; @@ -41,38 +42,33 @@ namespace IoTSharp.Storage { InfluxDBClient _taos = _taospool.Get(); var query = _taos.GetQueryApi(); - var v= query.QueryAsync( @$" + var v = query.QueryAsync(@$" from(bucket: ""iotsharp-bucket"") -|> range(start: -10h) +|> range(start: -72h) |> filter(fn: (r) => r[""_measurement""] == ""TelemetryData"") |> filter(fn: (r) => r[""DeviceId""] == ""{deviceId}"") - |> last()").GetAwaiter().GetResult(); - List dt = new List (); - v.ForEach(ft => - { - ft.Records.ForEach(fr => - { - dt.Add(new TelemetryDataDto() - { - KeyName = fr.GetField(), - DateTime = fr.GetTimeInDateTime().GetValueOrDefault(), - Value = fr.GetValue() - }); - }); - }); + |> last()"); + _taospool.Return(_taos); - return Task.FromResult(dt); + return FluxToDtoAsync(v); } - + public Task> GetTelemetryLatest(Guid deviceId, string keys) { InfluxDBClient _taos = _taospool.Get(); - IEnumerable kvs = from k in keys - select $" keyname = '{k}' "; - string sql = $"select last_row(*) from telemetrydata where deviceid='{deviceId:N}' and ({string.Join("or", kvs) }) group by deviceid,keyname"; - List dt = null;// SqlToTDD(_taos, sql, "last_row(", ")", string.Empty); + var query = _taos.GetQueryApi(); + var kvs = from k in keys.Split(';', ',') + select $"r[\"_field\"] == \"{k}\""; + var v = query.QueryAsync(@$" +from(bucket: ""iotsharp-bucket"") +|> range(start: -72h) +|> filter(fn: (r) => r[""_measurement""] == ""TelemetryData"") +|> filter(fn: (r) => r[""DeviceId""] == ""{deviceId}"") +|> filter(fn: (r) => {string.Join(" or ", kvs)}) +|> group(columns: [""_field""]) +|> last()"); _taospool.Return(_taos); - return Task.FromResult(dt); + return FluxToDtoAsync(v); } @@ -85,13 +81,37 @@ from(bucket: ""iotsharp-bucket"") public Task> LoadTelemetryAsync(Guid deviceId, string keys, DateTime begin, DateTime end) { InfluxDBClient _taos = _taospool.Get(); - IEnumerable kvs = from k in keys - select $" keyname = '{k}' "; - string sql = $"select tbname,keyname from telemetrydata where deviceid='{deviceId:N}' and ({string.Join("or", kvs) }) "; - List dt = null;// SQLToDTByDate(begin, end, _taos, sql); + var query = _taos.GetQueryApi(); + var kvs = from k in keys.Split(';',',') + select $"r[\"_field\"] == \"{k}\""; + var v = query.QueryAsync(@$" +from(bucket: ""iotsharp-bucket"") +|> range(start: {begin:o},stop:{end:o}) +|> filter(fn: (r) => r[""_measurement""] == ""TelemetryData"") +|> filter(fn: (r) => r[""DeviceId""] == ""{deviceId}"") +|> filter(fn: (r) => {string.Join(" or ", kvs)}) +|> group(columns: [""_field""]) +|> yield()"); _taospool.Return(_taos); - return Task.FromResult(dt); - + return FluxToDtoAsync(v); + } + + private async Task> FluxToDtoAsync(Task< List> v) + { + List dt = new List(); + (await v)?.ForEach(ft => + { + ft.Records.ForEach(fr => + { + dt.Add(new TelemetryDataDto() + { + KeyName = fr.GetField(), + DateTime = fr.GetTimeInDateTime().GetValueOrDefault(), + Value = fr.GetValue() + }); + }); + }); + return dt; } public Task> LoadTelemetryAsync(Guid deviceId, DateTime begin) @@ -102,10 +122,15 @@ from(bucket: ""iotsharp-bucket"") public Task> LoadTelemetryAsync(Guid deviceId, DateTime begin, DateTime end) { InfluxDBClient _taos = _taospool.Get(); - string sql = $"select tbname,keyname from telemetrydata where deviceid='{deviceId:N}'"; - List dt = null; //SQLToDTByDate(begin, end, _taos, sql); + var query = _taos.GetQueryApi(); + var v = query.QueryAsync(@$" +from(bucket: ""iotsharp-bucket"") +|> range(start: {begin:o},stop:{end:o}) +|> filter(fn: (r) => r[""_measurement""] == ""TelemetryData"") +|> filter(fn: (r) => r[""DeviceId""] == ""{deviceId}"") +|> yield()"); _taospool.Return(_taos); - return Task.FromResult(dt); + return FluxToDtoAsync(v); } public async Task StoreTelemetryAsync(RawMsg msg) diff --git a/IoTSharp/Storage/InfluxDBV1Storage.cs b/IoTSharp/Storage/InfluxDBV1Storage.cs index 2b24d0c0..5e92d9fd 100644 --- a/IoTSharp/Storage/InfluxDBV1Storage.cs +++ b/IoTSharp/Storage/InfluxDBV1Storage.cs @@ -59,18 +59,7 @@ namespace IoTSharp.Storage return Task.FromResult(dt); } - //private List SQLToDTByDate(DateTime begin, DateTime end, TaosConnection db, string sql) - //{ - // List dt = new List(); - // List<(string tbname, string keyname)> list = db.CreateCommand(sql).ExecuteReader().ToList<(string tbname, string keyname)>(); - // foreach ((string tbname, string keyname) item in list) - // { - // string susql = $" select * from {item.tbname} where ts >={begin:yyyy-MM-dd HH:mm:ss.fff} and ts <={end:yyyy-MM-dd HH:mm:ss.fff}"; - // List dtx = SqlToTDD(db, susql, "", "", item.keyname); - // dt.AddRange(dtx); - // } - // return dt; - //} + public Task> LoadTelemetryAsync(Guid deviceId, string keys, DateTime begin) { return LoadTelemetryAsync(deviceId, keys, begin, DateTime.Now); -- GitLab