提交 1cb8a249 编写于 作者: 麦壳饼's avatar 麦壳饼

重写上下线脚本。

上级 a22b2f9a
......@@ -5,7 +5,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.Oracle" Version="5.0.1" />
<PackageReference Include="AspNetCore.HealthChecks.Oracle" Version="6.0.1-rc2.3" />
<PackageReference Include="AspNetCore.HealthChecks.UI.Core" Version="6.0.1-rc2.4" />
<PackageReference Include="AspNetCore.HealthChecks.UI.InMemory.Storage" Version="6.0.1-rc2.4" />
<PackageReference Include="EFCore.Sharding.Oracle" Version="5.0.21" />
......
......@@ -6,6 +6,11 @@ using System.Threading.Tasks;
namespace IoTSharp.Data
{
public class DeviceStatus
{
public Guid DeviceId { get; set; }
public bool Status { get; set; }
}
public class RawMsg
{
......
......@@ -19,6 +19,9 @@ namespace IoTSharp.Extensions
{
cap.Publish("iotsharp.services.datastream.telemetrydata", msg);
}
public static void PublishDeviceStatus(this ICapPublisher cap, Guid devid , bool status)
{
cap.Publish("iotsharp.services.datastream.devicestatus", new { Device=devid, Status=status } );
}
}
}
......@@ -111,20 +111,8 @@ namespace IoTSharp
return externalPath + internalUiRoute;
});
}
/// <summary>
/// //如果上次活动时间距离当前时间超过10秒 或者 设备离线状态, 则更新状态。
/// </summary>
/// <param name="device"></param>
internal static void CheckOrUpdateDevStatus(this Device device)
{
if (DateTime.Now.Subtract(device.LastActive).TotalSeconds > 10 || device.Online == false)
{
device.Online = true;
device.LastActive = DateTime.Now;
}
}
internal static HealthChecks.UI.Configuration.Settings AddIoTSharpHealthCheckEndpoint(this HealthChecks.UI.Configuration.Settings setup)
{
......
......@@ -46,7 +46,6 @@ namespace IoTSharp.Handlers
[CapSubscribe("iotsharp.services.datastream.attributedata")]
public async void StoreAttributeData(RawMsg msg)
{
using (var _scope = _scopeFactor.CreateScope())
{
using (var _dbContext = _scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
......@@ -54,12 +53,6 @@ namespace IoTSharp.Handlers
var device = _dbContext.Device.FirstOrDefault(d => d.Id == msg.DeviceId);
if (device != null)
{
var old = device.Online;
device.CheckOrUpdateDevStatus();
if (old!=device.Online)
{
_logger.LogInformation($"通过属性修改变更设备状态 {device.Id}-{device.Name}在线.最后活动时间{device.LastActive}");
}
var mb = msg.MsgBody;
Dictionary<string, object> dc = new Dictionary<string, object>();
mb.ToList().ForEach(kp =>
......@@ -131,44 +124,45 @@ namespace IoTSharp.Handlers
}
}
}
[CapSubscribe("iotsharp.services.datastream.telemetrydata")]
public async void StoreTelemetryData(RawMsg msg)
}
[CapSubscribe("iotsharp.services.datastream.devicestatus")]
public void DeviceStatus(DeviceStatus status)
{
var result= await _storage.StoreTelemetryAsync(msg);
if (!_check_device_status.ContainsKey(msg.DeviceId))
{
_check_device_status.Add(msg.DeviceId, DateTime.Now);
}
if (_check_device_status[msg.DeviceId].Subtract(DateTime.Now).TotalSeconds > 60)
using (var _scope = _scopeFactor.CreateScope())
{
_check_device_status[msg.DeviceId] = DateTime.Now;
using (var scope = _scopeFactor.CreateScope())
using (var _dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
using (var _dbContext = _scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var device = _dbContext.Device.FirstOrDefault(d => d.Id == msg.DeviceId);
if (device != null)
var dev = _dbContext.Device.Find(status.DeviceId);
if (dev.Online == true && status.Status == false)
{
var old = device.Online;
device.CheckOrUpdateDevStatus();
if (old != device.Online)
{
_logger.LogInformation($"通过属性修改变更设备状态 {device.Id}-{device.Name}在线.最后活动时间{device.LastActive}");
}
await _dbContext.SaveChangesAsync();
dev.Online = false;
dev.LastActive = DateTime.Now;
//真正掉线
}
else if (dev.Online == false && status.Status == true)
{
dev.Online = true;
dev.LastActive = DateTime.Now;
//真正离线
}
_dbContext.SaveChanges();
}
}
}
[CapSubscribe("iotsharp.services.datastream.telemetrydata")]
public async void StoreTelemetryData(RawMsg msg)
{
var result = await _storage.StoreTelemetryAsync(msg);
ExpandoObject exps = new ExpandoObject();
result.telemetries.ForEach(td =>
{
exps.TryAdd(td.KeyName, td.ToObject());
});
await RunRules(msg.DeviceId,(dynamic)exps, MountType.Telemetry);
await RunRules(msg.DeviceId, (dynamic)exps, MountType.Telemetry);
}
private async Task RunRules(Guid devid, object obj, MountType mountType)
......
......@@ -69,7 +69,6 @@ namespace IoTSharp.Handlers
{
_logger.LogInformation($"Server is stopped");
}
Dictionary<string, int> lstTopics = new Dictionary<string, int>();
long received = 0;
internal async void Server_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
......@@ -82,20 +81,7 @@ namespace IoTSharp.Handlers
try
{
_logger.LogInformation($"Server received {e.ClientId}'s message: Topic=[{e.ApplicationMessage.Topic }],Retain=[{e.ApplicationMessage.Retain}],QualityOfServiceLevel=[{e.ApplicationMessage.QualityOfServiceLevel}]");
if (!lstTopics.ContainsKey(e.ApplicationMessage.Topic))
{
lstTopics.Add(e.ApplicationMessage.Topic, 1);
await _serverEx.PublishAsync("$SYS/broker/subscriptions/count", lstTopics.Count.ToString());
}
else
{
lstTopics[e.ApplicationMessage.Topic]++;
}
if (e.ApplicationMessage.Payload != null)
{
received += e.ApplicationMessage.Payload.Length;
}
string topic = e.ApplicationMessage.Topic;
string topic = e.ApplicationMessage.Topic.ToLower();
var tpary = topic.Split('/', StringSplitOptions.RemoveEmptyEntries);
var _dev = await FoundDevice(e.ClientId);
if (tpary.Length >= 3 && tpary[0] == "devices" && _dev != null)
......@@ -103,6 +89,7 @@ namespace IoTSharp.Handlers
Device device = JudgeOrCreateNewDevice(tpary, _dev);
if (device != null)
{
bool statushavevalue = false;
Dictionary<string, object> keyValues = new Dictionary<string, object>();
if (tpary.Length >= 4)
{
......@@ -133,7 +120,7 @@ namespace IoTSharp.Handlers
}
catch (Exception ex)
{
_logger.LogWarning(ex, $"ConvertPayloadToDictionary Error {topic},{ex.Message}");
_logger.LogWarning(ex, $"转换为字典格式失败 {topic},{ex.Message}");
}
}
if (tpary[2] == "telemetry")
......@@ -150,7 +137,11 @@ namespace IoTSharp.Handlers
{
_queue.PublishAttributeData(new RawMsg() { DeviceId = device.Id, MsgBody = keyValues, DataSide = DataSide.ClientSide, DataCatalog = DataCatalog.AttributeData });
}
}
else if (tpary[2] == "status" )
{
ResetDeviceStatus(device, tpary[3] == "online");
statushavevalue = true;
}
else if (tpary[2] == "rpc")
{
......@@ -158,55 +149,25 @@ namespace IoTSharp.Handlers
{
await ExecFlowRules(e, _dev.DeviceType == DeviceType.Gateway ? device : _dev, tpary[4], MountType.RPC);//完善后改成 RPC
}
}
else
{
await ExecFlowRules(e, _dev.DeviceType == DeviceType.Gateway ? device : _dev, MountType.RAW);//如果是网关
}
if (!statushavevalue)
{
ResetDeviceStatus(device);
}
}
else
{
_logger.LogInformation($"{e.ClientId}的数据{e.ApplicationMessage.Topic}未能匹配到设备");
}
}
else if (_dev?.Owner?.DeviceType== DeviceType.Gateway)
else
{
_logger.LogInformation($"{e.ClientId}的数据{e.ApplicationMessage.Topic}未能识别,分段:{tpary.Length} 前缀?{tpary[0]} 设备:{_dev?.Id} ,终端状态未找到。");
var ss = await _serverEx.GetClientStatusAsync();
var status= ss.FirstOrDefault(s => s.ClientId == e.ClientId);
if (status != null)
{
_logger.LogInformation($"{e.ClientId}的数据{e.ApplicationMessage.Topic}未能识别,分段:{tpary.Length} 前缀?{tpary[0]} 设备:{_dev?.Id} {status.ConnectedTimestamp} {status.Endpoint} ");
if (!status.Session.Items.ContainsKey("iotsharp_count"))
{
status.Session.Items.Add("iotsharp_count", 1);
}
else
{
status.Session.Items["iotsharp_count"] = 1 + (int)status.Session.Items["iotsharp_count"];
}
if (status.Session.Items.TryGetValue("iotsharp_count", out object count))
{
int _count = (int)count;
if (_count > 10)
{
await status.DisconnectAsync();
_logger.LogInformation($"未识别次数太多{_count}");
}
}
else
{
_logger.LogInformation("识别次数获取错误");
}
}
else
{
_logger.LogInformation("设备状态未能获取");
}
_logger.LogWarning($"不支持{e.ClientId}{e.ApplicationMessage.Topic}格式");
}
}
catch (Exception ex)
{
......@@ -216,6 +177,20 @@ namespace IoTSharp.Handlers
}
}
private void ResetDeviceStatus(Device device,bool status=true)
{
if (device.DeviceType == DeviceType.Device && device.Owner != null && device.Owner?.Id != null)//虚拟设备上线
{
_queue.PublishDeviceStatus(device.Id, status);
_queue.PublishDeviceStatus(device.Owner.Id, status);
}
else
{
_queue.PublishDeviceStatus(device.Id, status);
}
}
private async Task ExecFlowRules(MqttApplicationMessageReceivedEventArgs e, Device _dev, string method, MountType mount)
{
var rules = await _caching.GetAsync($"ruleid_{_dev.Id}_rpc_{method}", async () =>
......@@ -406,23 +381,17 @@ namespace IoTSharp.Handlers
devicedatato = new Device() { Id = Guid.NewGuid(), Name = tpary[1], DeviceType = DeviceType.Device, Tenant = gw.Tenant, Customer = gw.Customer, Owner = gw, LastActive = DateTime.Now, Timeout = 300 };
gw.Children.Add(devicedatato);
_dbContext.AfterCreateDevice(devicedatato);
gw.LastActive = DateTime.Now;
gw.Online = true;
_logger.LogInformation($"网关 {gw.Id}-{gw.Name}在线.最后活动时间{gw.LastActive},添加了子设备{devicedatato.Name}");
}
else
{
devicedatato = subdev.FirstOrDefault();
devicedatato.LastActive = DateTime.Now;
devicedatato.Online = true;
_logger.LogInformation($"网关子设备 {devicedatato.Id}-{devicedatato.Name}在线.最后活动时间{devicedatato.LastActive}");
}
}
else
{
devicedatato = _dbContext.Device.Find(device.Id);
devicedatato.LastActive = DateTime.Now;
devicedatato.Online = true;
_logger.LogInformation($"独立设备 {devicedatato.Id}-{devicedatato.Name}在线.最后活动时间{devicedatato.LastActive}");
}
_dbContext.SaveChanges();
......
......@@ -43,18 +43,13 @@ namespace IoTSharp.Jobs
{
try
{
var _device = cs.Session.Items?.FirstOrDefault(k => (string)k.Key == nameof(Device)).Value as Device;
if (_device != null)
{
var d = _dbContext.Device.FirstOrDefault(d => d.Id == _device.Id);
if (d != null)
{
d.LastActive = cs.LastPacketReceivedTimestamp.ToLocalTime();
d.Online = DateTime.Now.Subtract(d.LastActive).TotalSeconds < d.Timeout;
_logger.LogInformation($"MQTT状态设备{cs.ClientId}-{d.Name}({d.Id},{cs.Endpoint}) 最后活动时间{d.LastActive} 是否在线{d.Online} 发送消息:{cs.SentApplicationMessagesCount}({cs.BytesSent}kb) 收到{cs.ReceivedApplicationMessagesCount}({cs.BytesReceived / 1024}KB ) ");
if (!d.Online && DateTime.Now.Subtract(d.LastActive).TotalSeconds > d.Timeout * 5)
if (!d.Online && DateTime.Now.Subtract(d.LastActive).TotalSeconds > d.Timeout)
{
Task.Run(cs.DisconnectAsync);
}
......@@ -72,11 +67,9 @@ namespace IoTSharp.Jobs
//当前时间减去最后活跃时间如果小于超时时间, 则为在线, 否则就是离线
_dbContext.Device.ToList().ForEach(d =>
{
bool _old = d.Online;
d.Online = DateTime.Now.Subtract(d.LastActive).TotalSeconds < d.Timeout;
if (_old != d.Online)
if (d.Online && DateTime.Now.Subtract(d.LastActive).TotalSeconds > d.Timeout)
{
_logger.LogInformation($"根据最后活动时间改变设备状态 {d.Id}-{d.Name} {_old} { d.Online }.最后活动时间{d.LastActive}");
d.Online = false;
}
});
var saveresult = await _dbContext.SaveChangesAsync();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册