提交 8df3cb04 编写于 作者: 麦壳饼's avatar 麦壳饼

将消息事件 移动为单独的项目, 方便下一步重构。

上级 cc1454d8
using DotNetCore.CAP;
using Dynamitey;
using IoTSharp.Contracts;
using IoTSharp.Data;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace IoTSharp.EventBus
{
public static class EventBusExtensions
{
public static void PublishAttributeData(this ICapPublisher cap, PlayloadData msg)
{
cap.Publish("iotsharp.services.datastream.attributedata", msg);
}
public static void PublishAttributeData(this ICapPublisher cap, Device device, Dictionary<string, object> keyValues)
{
cap.PublishAttributeData(new PlayloadData() { DeviceId = device.Id, MsgBody = keyValues, DataSide = DataSide.ClientSide, DataCatalog = DataCatalog.AttributeData });
}
public static void PublishTelemetryData(this ICapPublisher cap, PlayloadData msg)
{
cap.Publish("iotsharp.services.datastream.telemetrydata", msg);
}
public static void PublishTelemetryData(this ICapPublisher cap, Device device, Dictionary<string, object> keyValues)
{
cap.PublishTelemetryData(new PlayloadData() { DeviceId = device.Id, MsgBody = keyValues, DataSide = DataSide.ClientSide, DataCatalog = DataCatalog.TelemetryData });
}
public static void PublishDeviceStatus(this ICapPublisher cap, Guid devid , DeviceStatus devicestatus)
{
cap.Publish("iotsharp.services.datastream.devicestatus", new PlayloadData { DeviceId=devid, DeviceStatus= devicestatus } );
}
public static void PublishSubDeviceOnline(this ICapPublisher _queue, Guid _gatewaydevid, Device subdev)
{
//如果是_dev的子设备, 则更新状态。
if (!subdev.Online && subdev.DeviceType == DeviceType.Device && subdev.Id != _gatewaydevid)
{
_queue.PublishDeviceStatus(subdev.Id, DeviceStatus.Good);
}
}
public static void PublishDeviceAlarm(this ICapPublisher cap, CreateAlarmDto alarmDto)
{
cap.Publish("iotsharp.services.datastream.alarm", alarmDto);
}
}
}
using DotNetCore.CAP;
using EasyCaching.Core;
using IoTSharp.Contracts;
using IoTSharp.Data;
using IoTSharp.Data.Extensions;
using IoTSharp.EventBus;
using IoTSharp.Extensions;
using IoTSharp.Storage;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Dynamic;
using System.Linq;
using System.Threading.Tasks;
namespace IoTSharp.EventBus
{
public interface IEventBusHandler
{
public void StoreAttributeData(PlayloadData msg);
public void StoreTelemetryData(PlayloadData msg);
}
public class EventBusHandler : IEventBusHandler, ICapSubscribe
{
private readonly AppSettings _appSettings;
private readonly ILogger _logger;
private readonly IServiceScopeFactory _scopeFactor;
private readonly IStorage _storage;
private readonly IEasyCachingProvider _caching;
private readonly EventBusOption _eventBusOption;
public EventBusHandler(ILogger<EventBusHandler> logger, IServiceScopeFactory scopeFactor
, IStorage storage, IEasyCachingProviderFactory factory, EventBusOption eventBusOption
)
{
string _hc_Caching = $"{nameof(CachingUseIn)}-{Enum.GetName(eventBusOption.AppSettings.CachingUseIn)}";
_logger = logger;
_scopeFactor = scopeFactor;
_storage = storage;
_caching = factory.GetCachingProvider(_hc_Caching);
_eventBusOption = eventBusOption;
}
[CapSubscribe("iotsharp.services.datastream.attributedata")]
public async void StoreAttributeData(PlayloadData msg)
{
try
{
using (var _scope = _scopeFactor.CreateScope())
{
using (var _dbContext = _scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var device = _dbContext.Device.FirstOrDefault(d => d.Id == msg.DeviceId);
if (device != null)
{
var mb = msg.MsgBody;
Dictionary<string, object> dc = new Dictionary<string, object>();
mb.ToList().ForEach(kp =>
{
if (kp.Value?.GetType() == typeof(System.Text.Json.JsonElement))
{
var je = (System.Text.Json.JsonElement)kp.Value;
switch (je.ValueKind)
{
case System.Text.Json.JsonValueKind.Undefined:
case System.Text.Json.JsonValueKind.Object:
case System.Text.Json.JsonValueKind.Array:
dc.Add(kp.Key, je.GetRawText());
break;
case System.Text.Json.JsonValueKind.String:
dc.Add(kp.Key, je.GetString());
break;
case System.Text.Json.JsonValueKind.Number:
dc.Add(kp.Key, je.GetDouble());
break;
case System.Text.Json.JsonValueKind.True:
case System.Text.Json.JsonValueKind.False:
dc.Add(kp.Key, je.GetBoolean());
break;
case System.Text.Json.JsonValueKind.Null:
break;
default:
break;
}
}
else if (kp.Value != null)
{
dc.Add(kp.Key, kp.Value);
}
});
var result2 = await _dbContext.SaveAsync<AttributeLatest>(dc, device.Id, msg.DataSide);
result2.exceptions?.ToList().ForEach(ex =>
{
_logger.LogError($"{ex.Key} {ex.Value} {Newtonsoft.Json.JsonConvert.SerializeObject(msg.MsgBody[ex.Key])}");
});
_logger.LogInformation($"更新{device.Name}({device.Id})属性数据结果{result2.ret}");
ExpandoObject obj = new ExpandoObject();
dc.ToList().ForEach(kv =>
{
obj.TryAdd(kv.Key, kv.Value);
});
await RunRules(msg.DeviceId, obj, MountType.Attribute);
}
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "StoreAttributeData:"+ex.Message);
}
}
private async Task RunRules(Guid deviceId, object obj, MountType attribute)
{
await _eventBusOption.RunRules(deviceId, obj, attribute);
}
[CapSubscribe("iotsharp.services.datastream.alarm")]
public async void OccurredAlarm(CreateAlarmDto alarmDto)
{
try
{
using (var _scope = _scopeFactor.CreateScope())
{
using (var _dbContext = _scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var alm = await _dbContext.OccurredAlarm(alarmDto);
if (alm.Code ==(int) ApiCode.Success)
{
alarmDto.warnDataId = alm.Data.Id;
alarmDto.CreateDateTime = alm.Data.AckDateTime;
if (alm.Data.Propagate)
{
await RunRules(alm.Data.OriginatorId, alarmDto, MountType.Alarm);
}
}
else
{
//如果设备通过网关创建, 当警告先来, 设备创建再后会出现此问题。
_logger.LogWarning( $"处理{alarmDto.OriginatorName} 的告警{alarmDto.AlarmType} 错误:{alm.Code}-{alm.Msg}");
}
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"处理{alarmDto.OriginatorName} 的告警{alarmDto.AlarmType} 时遇到异常:{ex.Message}");
}
}
[CapSubscribe("iotsharp.services.datastream.devicestatus")]
public void DeviceStatusEvent( PlayloadData status)
{
try
{
using (var _scope = _scopeFactor.CreateScope())
{
using (var _dbContext = _scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var dev = _dbContext.Device.FirstOrDefault(d=>d.Id==status.DeviceId);
if (dev != null)
{
if (dev.Online == true && status.DeviceStatus != DeviceStatus.Good)
{
dev.Online = false;
dev.LastActive = DateTime.Now;
Task.Run(() => RunRules(dev.Id, status, MountType.Offline));
//真正离线
}
else if (dev.Online == false && status.DeviceStatus== DeviceStatus.Good)
{
dev.Online = true;
dev.LastActive = DateTime.Now;
Task.Run(() => RunRules(dev.Id, status, MountType.Online));
//真正掉线
}
_dbContext.SaveChanges();
}
else
{
_logger.LogWarning( $"未找到设备{status.DeviceId} ,因此无法处理设备状态");
}
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"处理{status.DeviceId} 的状态{status.DeviceStatus} 时遇到异常:{ex.Message}");
}
}
[CapSubscribe("iotsharp.services.datastream.telemetrydata")]
public async void StoreTelemetryData(PlayloadData msg)
{
var result = await _storage.StoreTelemetryAsync(msg);
var data = from t in result.telemetries
select new TelemetryDataDto() { DateTime = t.DateTime, DataType=t.Type, KeyName = t.KeyName, Value = t.ToObject() };
var array = data.ToList();
ExpandoObject exps = new();
array.ForEach(td =>
{
exps.TryAdd(td.KeyName, td.Value);
});
await RunRules(msg.DeviceId, (dynamic)exps, MountType.Telemetry);
await RunRules(msg.DeviceId, array, MountType.TelemetryArray);
}
}
}
\ No newline at end of file
using DotNetCore.CAP.Dashboard.NodeDiscovery;
using MaiKeBing.HostedService.ZeroMQ;
using Microsoft.Extensions.DependencyInjection;
using IoTSharp.Contracts;
namespace IoTSharp.EventBus
{
public class EventBusOption
{
public AppSettings AppSettings { get; set; }
public ZMQOption ZMQOption { get; set; }
public DiscoveryOptions DiscoveryOptions { get; set; }
public string EventBusStore { get; set; }
public string EventBusMQ { get; set; }
public IHealthChecksBuilder HealthChecks { get; set; }
public delegate Task RunRulesEventHander(Guid devid, object obj, MountType mountType);
public RunRulesEventHander RunRules;
}
}

using DotNetCore.CAP;
using DotNetCore.CAP.Dashboard.NodeDiscovery;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client;
using Savorboard.CAP.InMemoryMessageQueue;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using IoTSharp.Contracts;
using static IoTSharp.EventBus.EventBusOption;
namespace IoTSharp.EventBus
{
public static class EventsBusDependencyInjection
{
public static IApplicationBuilder UseEventBus(this IApplicationBuilder app, Func<EventBusOption, RunRulesEventHander> action)
{
app.UseCapDashboard();
var provider = app.ApplicationServices;
var options = provider.GetService<EventBusOption>();
var hander= action.Invoke(options);
options.RunRules += hander;
return app;
}
public static void AddEventBus(this IServiceCollection services, Action<EventBusOption> opt)
{
var opts = new EventBusOption();
opt.Invoke(opts);
var settings = opts.AppSettings;
DiscoveryOptions _discovery = opts.DiscoveryOptions;
var healthChecks = opts.HealthChecks;
services.AddTransient<IEventBusHandler, EventBusHandler>();
var _EventBusStore = opts.EventBusStore;
var _EventBusMQ = opts.EventBusMQ;
if (opts.ZMQOption != null)
{
services.AddHostedZeroMQ(cfg => cfg = opts.ZMQOption);
}
services.AddSingleton(opts );
services.AddTransient<IEventBusHandler, EventBusHandler>();
services.AddCap(x =>
{
string _hc_EventBusStore = $"{nameof(EventBusStore)}-{Enum.GetName(settings.EventBusStore)}";
x.SucceedMessageExpiredAfter = settings.SucceedMessageExpiredAfter;
x.ConsumerThreadCount = settings.ConsumerThreadCount;
switch (settings.EventBusStore)
{
case EventBusStore.PostgreSql:
x.UsePostgreSql(_EventBusStore);
healthChecks.AddNpgSql(_EventBusStore, name: _hc_EventBusStore);
break;
case EventBusStore.MongoDB:
x.UseMongoDB(_EventBusStore); //注意,仅支持MongoDB 4.0+集群
healthChecks.AddMongoDb(_EventBusStore, name: _hc_EventBusStore);
break;
case EventBusStore.LiteDB:
x.UseLiteDBStorage(_EventBusStore);
break;
case EventBusStore.MySql:
x.UseMySql(_EventBusStore);
break;
case EventBusStore.SqlServer:
x.UseSqlServer(_EventBusStore);
break;
case EventBusStore.InMemory:
default:
x.UseInMemoryStorage();
break;
}
string _hc_EventBusMQ = $"{nameof(EventBusMQ)}-{Enum.GetName(settings.EventBusMQ)}";
switch (settings.EventBusMQ)
{
case EventBusMQ.RabbitMQ:
var url = new Uri(_EventBusMQ);
x.UseRabbitMQ(cfg =>
{
cfg.ConnectionFactoryOptions = cf =>
{
cf.AutomaticRecoveryEnabled = true;
cf.Uri = new Uri(_EventBusMQ);
};
});
//amqp://guest:guest@localhost:5672
healthChecks.AddRabbitMQ(connectionFactory =>
{
var factory = new ConnectionFactory()
{
Uri = new Uri(_EventBusMQ),
AutomaticRecoveryEnabled = true
};
return factory.CreateConnection();
}, _hc_EventBusMQ);
break;
case EventBusMQ.Kafka:
x.UseKafka(_EventBusMQ);
healthChecks.AddKafka(cfg =>
{
cfg.BootstrapServers = _EventBusMQ;
}, name: _hc_EventBusMQ);
break;
case EventBusMQ.ZeroMQ:
x.UseZeroMQ(cfg =>
{
cfg.HostName = _EventBusMQ ?? "127.0.0.1";
cfg.Pattern = MaiKeBing.CAP.NetMQPattern.PushPull;
});
break;
case EventBusMQ.AzureServiceBus:
x.UseAzureServiceBus(_EventBusMQ);
break;
case EventBusMQ.AmazonSQS:
x.UseAmazonSQS(opts =>
{
var uri = new Uri(_EventBusMQ);
if (!string.IsNullOrEmpty(uri.UserInfo) && uri.UserInfo?.Split(':').Length == 2)
{
var userinfo = uri.UserInfo.Split(':');
opts.Credentials = new Amazon.Runtime.BasicAWSCredentials(userinfo[0], userinfo[1]);
}
opts.Region = Amazon.RegionEndpoint.GetBySystemName(uri.Host);
});
break;
case EventBusMQ.RedisStreams:
x.UseRedis(_EventBusMQ);
break;
case EventBusMQ.NATS:
x.UseNATS(_EventBusMQ);
break;
case EventBusMQ.Pulsar:
x.UsePulsar(_EventBusMQ);
break;
case EventBusMQ.InMemory:
default:
x.UseInMemoryMessageQueue();
break;
}
x.UseDashboard();
if (_discovery != null)
{
x.UseDiscovery(cfg => cfg = _discovery);
}
});
}
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.NpgSql" Version="6.0.2" />
<PackageReference Include="CAP.Extensions" Version="1.0.35" />
<PackageReference Include="DotNetCore.CAP.Dashboard" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.InMemoryStorage" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.Kafka" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.MongoDB" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.MySql" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.PostgreSql" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.SqlServer" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.AzureServiceBus" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.AmazonSQS" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.NATS" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.RedisStreams" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.Pulsar" Version="6.1.0" />
<PackageReference Include="EasyCaching.Core" Version="1.6.1" />
<PackageReference Include="AspNetCore.HealthChecks.Kafka" Version="6.0.2" />
<PackageReference Include="AspNetCore.HealthChecks.MongoDb" Version="6.0.2" />
<PackageReference Include="AspNetCore.HealthChecks.Rabbitmq" Version="6.0.2" />
<PackageReference Include="AspNetCore.HealthChecks.Redis" Version="6.0.4" />
<PackageReference Include="MaiKeBing.CAP.ZeroMQ" Version="1.0.35" />
<PackageReference Include="MaiKeBing.CAP.LiteDB" Version="1.0.35" />
<PackageReference Include="MaiKeBing.HostedService.ZeroMQ" Version="1.0.35" />
<PackageReference Include="Savorboard.CAP.InMemoryMessageQueue" Version="6.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\IoTSharp.Contracts\IoTSharp.Contracts.csproj" />
<ProjectReference Include="..\IoTSharp.Data.TimeSeries\IoTSharp.Data.TimeSeries.csproj" />
<ProjectReference Include="..\IoTSharp.Data\IoTSharp.Data.csproj" />
</ItemGroup>
</Project>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册