From 5c75b9b0d15599426859ba14e81e27537abf39aa Mon Sep 17 00:00:00 2001 From: MysticBoy Date: Tue, 4 Jun 2019 22:57:16 +0800 Subject: [PATCH] Subscribe to attribute updates from the server --- .gitignore | 1 + Clients/MQTTClient/Program.cs | 7 + IoTSharp/Handlers/MqttEventsHandler.cs | 204 +++++++++++++++++-------- 3 files changed, 152 insertions(+), 60 deletions(-) diff --git a/.gitignore b/.gitignore index 30e0ff60..f5e73312 100644 --- a/.gitignore +++ b/.gitignore @@ -330,3 +330,4 @@ ASALocalRun/ .mfractor/ /healthchecksdb /.vscode +/Clients/MQTTClient/.vscode diff --git a/Clients/MQTTClient/Program.cs b/Clients/MQTTClient/Program.cs index 260ba90e..04de139f 100644 --- a/Clients/MQTTClient/Program.cs +++ b/Clients/MQTTClient/Program.cs @@ -19,6 +19,11 @@ namespace MQTTClient .WithTcpServer("localhost") .WithCredentials("3cb97cd31fbc40b08d12ec47a6fad622")//token .Build(); + client.UseApplicationMessageReceivedHandler(ax=> + { + Console.WriteLine($"ClientId{ ax.ClientId},msg={ax.ApplicationMessage.ConvertPayloadToString()}"); + }); + Task.Run(async () => { await client.ConnectAsync(options); @@ -35,6 +40,8 @@ namespace MQTTClient Console.WriteLine(message.ConvertPayloadToString()); await client.PublishAsync(message); await Task.Delay(TimeSpan.FromSeconds(10)); + await client.SubscribeAsync("/devices/me/attributes/response/+"); + await client.PublishAsync("/devices/me/attributes/request/1", "{\"anySide\":\"Doublevalue,longvalue,Doublevalue,longvalue\"}"); } while (Console.ReadKey().Key != ConsoleKey.Escape); await client.DisconnectAsync(); }).Wait(); diff --git a/IoTSharp/Handlers/MqttEventsHandler.cs b/IoTSharp/Handlers/MqttEventsHandler.cs index b869b5fa..87aba912 100644 --- a/IoTSharp/Handlers/MqttEventsHandler.cs +++ b/IoTSharp/Handlers/MqttEventsHandler.cs @@ -48,96 +48,180 @@ namespace IoTSharp.Handlers } Dictionary lstTopics = new Dictionary(); long received = 0; - internal void Server_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) { - _logger.LogInformation($"Server received {e.ClientId}'s message: Topic=[{e.ApplicationMessage.Topic }],Retain=[{e.ApplicationMessage.Retain}],QualityOfServiceLevel=[{e.ApplicationMessage.QualityOfServiceLevel}]"); - if (!lstTopics.ContainsKey(e.ApplicationMessage.Topic)) + if (string.IsNullOrEmpty(e.ClientId)) { - lstTopics.Add(e.ApplicationMessage.Topic, 1); - Task.Run(() => _serverEx.PublishAsync("$SYS/broker/subscriptions/count", lstTopics.Count.ToString())); + _logger.LogInformation($"Message: Topic=[{e.ApplicationMessage.Topic }]"); } else { - lstTopics[e.ApplicationMessage.Topic]++; - } - if (e.ApplicationMessage.Payload != null) - { - received += e.ApplicationMessage.Payload.Length; - } - string topic = e.ApplicationMessage.Topic; - var tpary = topic.Split('/', StringSplitOptions.RemoveEmptyEntries); - if (tpary.Length >= 3 && tpary[0] == "devices" && Devices.ContainsKey(e.ClientId)) - { - Device device = JudgeOrCreateNewDevice(tpary, Devices[e.ClientId]); - if (device != null) + _logger.LogInformation($"Server received {e.ClientId}'s message: Topic=[{e.ApplicationMessage.Topic }],Retain=[{e.ApplicationMessage.Retain}],QualityOfServiceLevel=[{e.ApplicationMessage.QualityOfServiceLevel}]"); + if (!lstTopics.ContainsKey(e.ApplicationMessage.Topic)) + { + lstTopics.Add(e.ApplicationMessage.Topic, 1); + Task.Run(() => _serverEx.PublishAsync("$SYS/broker/subscriptions/count", lstTopics.Count.ToString())); + } + else + { + lstTopics[e.ApplicationMessage.Topic]++; + } + if (e.ApplicationMessage.Payload != null) { - Dictionary keyValues = new Dictionary(); - if (tpary.Length >= 4) + received += e.ApplicationMessage.Payload.Length; + } + string topic = e.ApplicationMessage.Topic; + var tpary = topic.Split('/', StringSplitOptions.RemoveEmptyEntries); + if (tpary.Length >= 3 && tpary[0] == "devices" && Devices.ContainsKey(e.ClientId)) + { + Device device = JudgeOrCreateNewDevice(tpary, Devices[e.ClientId]); + if (device != null) { - string keyname = tpary.Length >= 5 ? tpary[4] : tpary[3]; - if (tpary[3].ToLower() == "xml") + Dictionary keyValues = new Dictionary(); + if (tpary.Length >= 4) { - try + string keyname = tpary.Length >= 5 ? tpary[4] : tpary[3]; + if (tpary[3].ToLower() == "xml") { - var xml = new System.Xml.XmlDocument(); - xml.LoadXml(e.ApplicationMessage.ConvertPayloadToString()); - keyValues.Add(keyname, xml); + try + { + var xml = new System.Xml.XmlDocument(); + xml.LoadXml(e.ApplicationMessage.ConvertPayloadToString()); + keyValues.Add(keyname, xml); + } + catch (Exception ex) + { + _logger.LogWarning(ex, $"xml data error {topic},{ex.Message}"); + } } - catch (Exception ex) + else if (tpary[3].ToLower() == "binary") { - _logger.LogWarning(ex, $"xml data error {topic},{ex.Message}"); + keyValues.Add(keyname, e.ApplicationMessage.Payload); } } - else if (tpary[3].ToLower() == "binary") - { - keyValues.Add(keyname, e.ApplicationMessage.Payload); - } - } - else - { - try - { - keyValues = e.ApplicationMessage.ConvertPayloadToDictionary(); - } - catch (Exception ex) - { - _logger.LogWarning(ex, $"ConvertPayloadToDictionary Error {topic},{ex.Message}"); - } - } - if (tpary[2] == "telemetry") - { - Task.Run(async () => + else { try { - var result = await _dbContext.SaveAsync(keyValues, device, DataSide.ClientSide); + keyValues = e.ApplicationMessage.ConvertPayloadToDictionary(); } catch (Exception ex) { - _logger.LogError(ex, $"Can't upload telemetry to device {device.Name}({device.Id}).the payload is {e.ApplicationMessage.ConvertPayloadToString()}"); + _logger.LogWarning(ex, $"ConvertPayloadToDictionary Error {topic},{ex.Message}"); } - }); - } - else if (tpary[2] == "attributes") - { - Task.Run(async () => + } + if (tpary[2] == "telemetry") { - try + Task.Run(async () => { - - var result = await _dbContext.SaveAsync(keyValues, device, DataSide.ClientSide); + try + { + var result = await _dbContext.SaveAsync(keyValues, device, DataSide.ClientSide); + } + catch (Exception ex) + { + _logger.LogError(ex, $"Can't upload telemetry to device {device.Name}({device.Id}).the payload is {e.ApplicationMessage.ConvertPayloadToString()}"); + } + }); + } + else if (tpary[2] == "attributes") + { + if (tpary.Length > 3 && tpary[3] == "request") + { + Task.Run(async () => + { + await RequestAttributes(tpary, e.ApplicationMessage.ConvertPayloadToDictionary(), device); + }); } - catch (Exception ex) + else { - _logger.LogError(ex, $"Can't upload attributes to device {device.Name}({device.Id}).the payload is \"{e.ApplicationMessage.ConvertPayloadToString()}\""); + Task.Run(async () => + { + try + { + + var result = await _dbContext.SaveAsync(keyValues, device, DataSide.ClientSide); + } + catch (Exception ex) + { + _logger.LogError(ex, $"Can't upload attributes to device {device.Name}({device.Id}).the payload is \"{e.ApplicationMessage.ConvertPayloadToString()}\""); + } + }); } - }); + } + } + } + } + } + + private async Task RequestAttributes(string[] tpary, Dictionary keyValues,Device device) + { + if (tpary.Length>5 && tpary[4] == "xml" ) + { + var qf = from at in _dbContext.AttributeData where at.Type == DataType.XML && at.KeyName == tpary[5] select at; + await _serverEx.PublishAsync($"/devices/me/attributes/response/{tpary[5]}", qf.FirstOrDefault()?.Value_XML); + } + else if (tpary.Length > 5 && tpary[4] == "binary") + { + var qf = from at in _dbContext.AttributeData where at.Type == DataType.Binary && at.KeyName == tpary[5] select at; + await _serverEx.PublishAsync(new MqttApplicationMessage() { Topic = $"/devices/me/attributes/response/{tpary[5]}", Payload = qf.FirstOrDefault()?.Value_Binary }); + } + else + { + Dictionary reps = new Dictionary(); + var reqid = tpary.Length > 4 ? tpary[4] : Guid.NewGuid().ToString(); + List datas = new List(); + foreach (var kx in keyValues) + { + var keys = kx.Value?.ToString().Split(','); + if (keys != null && keys.Length > 0) + { + if (Enum.TryParse(kx.Key, true, out DataSide ds)) + { + var qf = from at in _dbContext.AttributeData where at.Device== device && at.DataSide == ds && keys.Contains(at.KeyName) select at; + datas.AddRange(await qf.ToArrayAsync()); + } + } + } + + + foreach (var item in datas) + { + switch (item.Type) + { + case DataType.Boolean: + reps.Add(item.KeyName, item.Value_Boolean); + break; + case DataType.String: + reps.Add(item.KeyName, item.Value_String); + break; + case DataType.Long: + reps.Add(item.KeyName, item.Value_Long); + break; + case DataType.Double: + reps.Add(item.KeyName, item.Value_Double); + break; + case DataType.Json: + reps.Add(item.KeyName, Newtonsoft.Json.Linq.JToken.Parse(item.Value_Json)); + break; + case DataType.XML: + reps.Add(item.KeyName, item.Value_XML); + break; + case DataType.Binary: + reps.Add(item.KeyName, item.Value_Binary); + break; + case DataType.DateTime: + reps.Add(item.KeyName, item.Value_DateTime); + break; + default: + reps.Add(item.KeyName, item.Value_Json); + break; } } + await _serverEx.PublishAsync($"/devices/me/attributes/response/{reqid}", Newtonsoft.Json.JsonConvert.SerializeObject(reps)); } - + } internal void Server_ClientDisconnected(IMqttServerEx server, MqttServerClientDisconnectedEventArgs args) -- GitLab