diff --git a/IoTSharp/Extensions/EventBusExtensions.cs b/IoTSharp/Extensions/EventBusExtensions.cs index df3a77809ff03d529bb26a991fdc84ddb2d8ee76..4f5710dbff839c7da7f5e5b0f36ee248092074f2 100644 --- a/IoTSharp/Extensions/EventBusExtensions.cs +++ b/IoTSharp/Extensions/EventBusExtensions.cs @@ -21,7 +21,7 @@ namespace IoTSharp.Extensions } public static void PublishDeviceStatus(this ICapPublisher cap, Guid devid , bool status) { - cap.Publish("iotsharp.services.datastream.devicestatus", new { Device=devid, Status=status } ); + cap.Publish("iotsharp.services.datastream.devicestatus", new DeviceStatus { DeviceId=devid, Status=status } ); } } } diff --git a/IoTSharp/Handlers/EventBusHandler.cs b/IoTSharp/Handlers/EventBusHandler.cs index f4c41f57f45cdcfd29357c4604866e856d063ebe..4949af6dd46e3d647d095f8a2a9400bcdea3674a 100644 --- a/IoTSharp/Handlers/EventBusHandler.cs +++ b/IoTSharp/Handlers/EventBusHandler.cs @@ -128,28 +128,43 @@ namespace IoTSharp.Handlers [CapSubscribe("iotsharp.services.datastream.devicestatus")] public void DeviceStatus(DeviceStatus status) { - using (var _scope = _scopeFactor.CreateScope()) + try { - using (var _dbContext = _scope.ServiceProvider.GetRequiredService()) + using (var _scope = _scopeFactor.CreateScope()) { - var dev = _dbContext.Device.Find(status.DeviceId); - if (dev.Online == true && status.Status == false) - { - dev.Online = false; - dev.LastActive = DateTime.Now; - Task.Run(()=> RunRules(dev.Id, status, MountType.Online)); - //真正掉线 - } - else if (dev.Online == false && status.Status == true) + using (var _dbContext = _scope.ServiceProvider.GetRequiredService()) { - dev.Online = true; - dev.LastActive = DateTime.Now; - Task.Run(() => RunRules(dev.Id, status, MountType.Offline)); - //真正离线 + var dev = _dbContext.Device.FirstOrDefault(d=>d.Id==status.DeviceId); + if (dev != null) + { + if (dev.Online == true && status.Status == false) + { + dev.Online = false; + dev.LastActive = DateTime.Now; + Task.Run(() => RunRules(dev.Id, status, MountType.Online)); + //真正掉线 + } + else if (dev.Online == false && status.Status == true) + { + dev.Online = true; + dev.LastActive = DateTime.Now; + Task.Run(() => RunRules(dev.Id, status, MountType.Offline)); + //真正离线 + } + _dbContext.SaveChanges(); + } + else + { + _logger.LogWarning( $"未找到设备{status.DeviceId} ,因此无法处理设备状态"); + } } - _dbContext.SaveChanges(); } } + catch (Exception ex) + { + _logger.LogError(ex, $"处理{status.DeviceId} 的状态{status.Status} 时遇到异常:{ex.Message}"); + + } } diff --git a/IoTSharp/Handlers/MQTTServerHandler.cs b/IoTSharp/Handlers/MQTTServerHandler.cs index 490e3c16077c5315ffb05d1574088fde0db2b653..e0fc4c2ab970e1a2f762694def0eeb1490704383 100644 --- a/IoTSharp/Handlers/MQTTServerHandler.cs +++ b/IoTSharp/Handlers/MQTTServerHandler.cs @@ -81,9 +81,10 @@ namespace IoTSharp.Handlers try { _logger.LogInformation($"Server received {e.ClientId}'s message: Topic=[{e.ApplicationMessage.Topic }],Retain=[{e.ApplicationMessage.Retain}],QualityOfServiceLevel=[{e.ApplicationMessage.QualityOfServiceLevel}]"); - string topic = e.ApplicationMessage.Topic.ToLower(); + string topic = e.ApplicationMessage.Topic; var tpary = topic.Split('/', StringSplitOptions.RemoveEmptyEntries); var _dev = await FoundDevice(e.ClientId); + if (tpary.Length >= 3 && tpary[0] == "devices" && _dev != null) { Device device = JudgeOrCreateNewDevice(tpary, _dev); @@ -166,7 +167,8 @@ namespace IoTSharp.Handlers } else { - _logger.LogWarning($"不支持{e.ClientId}的{e.ApplicationMessage.Topic}格式"); + //tpary.Length >= 3 && tpary[0] == "devices" && _dev != null + _logger.LogWarning($"不支持{e.ClientId}的{e.ApplicationMessage.Topic}格式,Length:{tpary.Length },{tpary[0] },{ _dev != null}"); } } catch (Exception ex) @@ -180,10 +182,12 @@ namespace IoTSharp.Handlers private void ResetDeviceStatus(Device device,bool status=true) { - if (device.DeviceType == DeviceType.Device && device.Owner != null && device.Owner?.Id != null)//虚拟设备上线 + _logger.LogInformation($"重置状态{device.Id} {device.Name}"); + if (device.DeviceType == DeviceType.Device && device.Owner != null && device.Owner?.Id != null && device.Owner?.Id !=Guid.Empty)//虚拟设备上线 { _queue.PublishDeviceStatus(device.Id, status); _queue.PublishDeviceStatus(device.Owner.Id, status); + _logger.LogInformation($"重置网关状态{device.Owner.Id} {device.Owner.Name}"); } else { @@ -244,9 +248,26 @@ namespace IoTSharp.Handlers private async Task FoundDevice(string clientid) { - var ss = await _serverEx.GetSessionStatusAsync(); - var _device = ss.FirstOrDefault(s => s.ClientId == clientid)?.Items?.FirstOrDefault(k => (string)k.Key == nameof(Device)).Value as Device; - return _device; + Device device = null; + var clients = await _serverEx.GetClientStatusAsync(); + var client = clients.FirstOrDefault(c => c.ClientId == clientid); + if (client != null) + { + device = client.Session?.Items?.FirstOrDefault(k => (string)k.Key == nameof(Device)).Value as Device; + if (device==null) + { + if (clientid != _mcsetting.MqttBroker) + { + _logger.LogWarning($"未能找到客户端{clientid }回话附加的设备信息,现在断开此链接。 "); + await client.DisconnectAsync(); + } + } + } + else + { + _logger.LogWarning($"未能找到客户端{clientid }上下文信息"); + } + return device; } private async Task RequestAttributes(string[] tpary, Dictionary keyValues, Device device) @@ -462,7 +483,7 @@ namespace IoTSharp.Handlers Uri uri = new Uri("mqtt://" + obj.Endpoint); isLoopback = uri.IsLoopback; } - if (isLoopback && !string.IsNullOrEmpty(e.Context.ClientId) && e.Context.ClientId == _mcsetting.MqttBroker && !string.IsNullOrEmpty(e.Context.Username) && e.Context.Username == _mcsetting.UserName && e.Context.Password == _mcsetting.Password) + if (isLoopback && !string.IsNullOrEmpty(e.Context.ClientId) && e.Context.ClientId == _mcsetting.MqttBroker && !string.IsNullOrEmpty(e.Context.Username) ) { e.Context.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.Success; }