提交 29814e31 编写于 作者: 麦壳饼's avatar 麦壳饼

通过 controllers 实现 topic 处理

上级 300cd6a0
......@@ -64,13 +64,21 @@ EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Github Workflows", "Github Workflows", "{59A51D63-F292-46F0-A4BF-975C98966DAA}"
ProjectSection(SolutionItems) = preProject
appveyor.yml = appveyor.yml
.github\workflows\dotnet-publish.yml = .github\workflows\dotnet-publish.yml
.github\workflows\docs-deploy.yml = .github\workflows\docs-deploy.yml
.github\workflows\dotnet-build.yml = .github\workflows\dotnet-build.yml
.github\workflows\dotnet-publish.yml = .github\workflows\dotnet-publish.yml
.github\FUNDING.yml = .github\FUNDING.yml
.github\workflows\sonar_analyze.yml = .github\workflows\sonar_analyze.yml
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.AspNetCore.Routing", "MQTTnet.AspNetCore.Routing\Source\MQTTnet.AspNetCore.Routing.csproj", "{EA5E09EC-3490-4CB8-85F7-2F5D3A3B7BD5}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ExampleServer", "MQTTnet.AspNetCore.Routing\ExampleServer\ExampleServer.csproj", "{459AFEA0-78A0-471F-B43A-CCBF0B640131}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ExampleClient", "MQTTnet.AspNetCore.Routing\ExampleClient\ExampleClient.csproj", "{5A80EC19-D6AC-4CE0-9DA0-026C7C7C5282}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "MQTTRouting", "MQTTRouting", "{3931E58B-9E76-45DB-9DBF-2B1495C26BA1}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
......@@ -137,6 +145,18 @@ Global
{9C6FA709-3334-4C39-922D-6FF5ADB8C683}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9C6FA709-3334-4C39-922D-6FF5ADB8C683}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9C6FA709-3334-4C39-922D-6FF5ADB8C683}.Release|Any CPU.Build.0 = Release|Any CPU
{EA5E09EC-3490-4CB8-85F7-2F5D3A3B7BD5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{EA5E09EC-3490-4CB8-85F7-2F5D3A3B7BD5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EA5E09EC-3490-4CB8-85F7-2F5D3A3B7BD5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EA5E09EC-3490-4CB8-85F7-2F5D3A3B7BD5}.Release|Any CPU.Build.0 = Release|Any CPU
{459AFEA0-78A0-471F-B43A-CCBF0B640131}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{459AFEA0-78A0-471F-B43A-CCBF0B640131}.Debug|Any CPU.Build.0 = Debug|Any CPU
{459AFEA0-78A0-471F-B43A-CCBF0B640131}.Release|Any CPU.ActiveCfg = Release|Any CPU
{459AFEA0-78A0-471F-B43A-CCBF0B640131}.Release|Any CPU.Build.0 = Release|Any CPU
{5A80EC19-D6AC-4CE0-9DA0-026C7C7C5282}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5A80EC19-D6AC-4CE0-9DA0-026C7C7C5282}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5A80EC19-D6AC-4CE0-9DA0-026C7C7C5282}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5A80EC19-D6AC-4CE0-9DA0-026C7C7C5282}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
......@@ -144,6 +164,9 @@ Global
GlobalSection(NestedProjects) = preSolution
{D099C28B-1CB9-4F2B-829C-EADCC69BA095} = {86E9E4E0-24A0-4C89-A98B-B0D41C75B4F1}
{A2A1191C-32ED-482B-AACE-4091EE518CEC} = {86E9E4E0-24A0-4C89-A98B-B0D41C75B4F1}
{EA5E09EC-3490-4CB8-85F7-2F5D3A3B7BD5} = {3931E58B-9E76-45DB-9DBF-2B1495C26BA1}
{459AFEA0-78A0-471F-B43A-CCBF0B640131} = {3931E58B-9E76-45DB-9DBF-2B1495C26BA1}
{5A80EC19-D6AC-4CE0-9DA0-026C7C7C5282} = {3931E58B-9E76-45DB-9DBF-2B1495C26BA1}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {CD7ADD62-E3BD-453A-B653-BC70DB9FF817}
......
......@@ -16,10 +16,23 @@ namespace IoTSharp.Extensions
{
cap.Publish("iotsharp.services.datastream.attributedata", msg);
}
public static void PublishAttributeData(this ICapPublisher cap, Device device, Dictionary<string, object> keyValues)
{
cap.PublishTelemetryData(new PlayloadData() { DeviceId = device.Id, MsgBody = keyValues, DataSide = DataSide.ClientSide, DataCatalog = DataCatalog.AttributeData });
}
public static void PublishTelemetryData(this ICapPublisher cap, PlayloadData msg)
{
cap.Publish("iotsharp.services.datastream.telemetrydata", msg);
}
public static void PublishTelemetryData(this ICapPublisher cap, Device device, Dictionary<string, object> keyValues)
{
cap.PublishTelemetryData(new PlayloadData() { DeviceId = device.Id, MsgBody = keyValues, DataSide = DataSide.ClientSide, DataCatalog = DataCatalog.TelemetryData });
}
public static void PublishDeviceStatus(this ICapPublisher cap, Guid devid , DeviceStatus devicestatus)
{
cap.Publish("iotsharp.services.datastream.devicestatus", new PlayloadData { DeviceId=devid, DeviceStatus= devicestatus } );
......
......@@ -15,6 +15,8 @@ using System.Runtime.InteropServices;
using System.Threading;
using System.Security.Cryptography.X509Certificates;
using MQTTnet;
using MQTTnet.AspNetCore.AttributeRouting;
using IoTSharp.Data;
namespace IoTSharp
{
......@@ -23,8 +25,11 @@ namespace IoTSharp
//static private IMqttServer _mqttServer;
public static void AddIoTSharpMqttServer(this IServiceCollection services, MqttBrokerSetting broker)
{
services.AddMqttControllers();
services.AddSingleton<MqttServer>();
services.AddSingleton<MQTTService>();
services.AddMqttTcpServerAdapter();
services.AddHostedMqttServer(options =>
services.AddHostedMqttServerWithServices(options =>
{
options.WithDefaultEndpointPort(broker.Port).WithDefaultEndpoint();
if (broker.EnableTls)
......@@ -50,24 +55,29 @@ namespace IoTSharp
}).AddMqttConnectionHandler()
.AddConnections();
services.AddMqttWebSocketServerAdapter();
services.AddSingleton<MQTTService>();
}
public static void UseIotSharpMqttServer(this IApplicationBuilder app)
{
var mqttEvents = app.ApplicationServices.CreateScope().ServiceProvider.GetService<MQTTService>();
app.UseMqttServer(server =>
{
server.ClientConnectedAsync += mqttEvents.Server_ClientConnectedAsync;
server.StartedAsync += mqttEvents.Server_Started ;
server.StoppedAsync += mqttEvents.Server_Stopped ;
server.InterceptingPublishAsync += mqttEvents.Server_ApplicationMessageReceived;
server .ClientSubscribedTopicAsync += mqttEvents.Server_ClientSubscribedTopic;
server.WithAttributeRouting(app.ApplicationServices, true);
server.ClientConnectedAsync += mqttEvents.Server_ClientConnectedAsync;
server.StartedAsync += mqttEvents.Server_Started;
server.StoppedAsync += mqttEvents.Server_Stopped;
// server.InterceptingPublishAsync += mqttEvents.Server_ApplicationMessageReceived;
server.ClientSubscribedTopicAsync += mqttEvents.Server_ClientSubscribedTopic;
server.ClientUnsubscribedTopicAsync += mqttEvents.Server_ClientUnsubscribedTopic;
server.ValidatingConnectionAsync += mqttEvents.Server_ClientConnectionValidator;
server.ClientDisconnectedAsync +=mqttEvents.Server_ClientDisconnected;
server.ClientDisconnectedAsync += mqttEvents.Server_ClientDisconnected;
});
}
public static async Task PublishAsync<T>(this MqttServer mqtt, string SenderClientId, string topic, T _payload) where T : class
{
......
......@@ -208,10 +208,10 @@
<ProjectReference Include="..\IoTSharp.Data\IoTSharp.Data.csproj" />
<ProjectReference Include="..\IoTSharp.Interpreter\IoTSharp.Interpreter.csproj" />
<ProjectReference Include="..\IoTSharp.TaskAction\IoTSharp.TaskAction.csproj" />
<ProjectReference Include="..\MQTTnet.AspNetCore.Routing\Source\MQTTnet.AspNetCore.Routing.csproj" />
</ItemGroup>
<ItemGroup>
<Folder Include="ClientApp\" />
<Folder Include="Services\MQTTControllers\" />
</ItemGroup>
<ItemGroup>
<Compile Update="Properties\Resources.Designer.cs">
......
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks;
using System;
using MQTTnet.AspNetCore.AttributeRouting;
using DotNetCore.CAP;
using EasyCaching.Core;
using IoTSharp.FlowRuleEngine;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using MQTTnet.Server;
using IoTSharp.Data;
using Dynamitey.DynamicObjects;
using Amazon.SimpleNotificationService.Model;
using System.Collections.Generic;
using MQTTnet;
using IoTSharp.Extensions;
using NATS.Client;
using static IronPython.Modules._ast;
using System.Linq;
using Microsoft.EntityFrameworkCore;
namespace IoTSharp.Services.MQTTControllers
{
[MqttController]
[MqttRoute("[controller]")]
public class DevicesController : MqttBaseController
{
readonly ILogger _logger;
private readonly IServiceScopeFactory _scopeFactor;
private readonly IEasyCachingProviderFactory _factory;
readonly MqttServer _serverEx;
private readonly ICapPublisher _queue;
private readonly FlowRuleProcessor _flowRuleProcessor;
private readonly IEasyCachingProvider _caching;
private readonly Device _dev;
private readonly MQTTService _service;
readonly MqttClientSetting _mcsetting;
private readonly AppSettings _settings;
public DevicesController(ILogger<DevicesController> logger, IServiceScopeFactory scopeFactor, MQTTService mqttService,
IOptions<AppSettings> options, ICapPublisher queue, IEasyCachingProviderFactory factory, FlowRuleProcessor flowRuleProcessor
)
{
string _hc_Caching = $"{nameof(CachingUseIn)}-{Enum.GetName(options.Value.CachingUseIn)}";
_mcsetting = options.Value.MqttClient;
_settings = options.Value;
_logger = logger;
_scopeFactor = scopeFactor;
_factory = factory;
_queue = queue;
_flowRuleProcessor = flowRuleProcessor;
_caching = factory.GetCachingProvider(_hc_Caching);
_dev =Lazy.Create(async ()=>await GetSessionDataAsync<Device>(nameof(Device)));
_service = mqttService;
}
[MqttRoute("{devname}/telemetry/xml/{keyname}")]
public Task telemetry_xml(string devname,string keyname)
{
var device = _dev.JudgeOrCreateNewDevice(devname, _scopeFactor, _logger);
Dictionary<string, object> keyValues = new Dictionary<string, object>();
try
{
var xml = new System.Xml.XmlDocument();
xml.LoadXml(Message.ConvertPayloadToString());
keyValues.Add(keyname, xml);
_queue.PublishTelemetryData(device, keyValues);
}
catch (Exception ex)
{
_logger.LogWarning(ex, $"{ex.Message}");
}
return Ok();
}
[MqttRoute("{devname}/telemetry/binary/{keyname}")]
public Task telemetry_binary(string devname, string keyname)
{
var device = _dev.JudgeOrCreateNewDevice(devname, _scopeFactor, _logger);
Dictionary<string, object> keyValues = new Dictionary<string, object>();
try
{
keyValues.Add(keyname, Message.Payload);
_queue.PublishTelemetryData(device, keyValues);
}
catch (Exception ex)
{
_logger.LogWarning(ex, $"{ex.Message}");
}
return Ok();
}
[MqttRoute("{devname}/attributes/xml/{keyname}")]
public Task attributes_xml(string devname, string keyname)
{
var device = _dev.JudgeOrCreateNewDevice(devname, _scopeFactor, _logger);
Dictionary<string, object> keyValues = new Dictionary<string, object>();
try
{
var xml = new System.Xml.XmlDocument();
xml.LoadXml(Message.ConvertPayloadToString());
keyValues.Add(keyname, xml);
_queue.PublishAttributeData(device, keyValues);
}
catch (Exception ex)
{
_logger.LogWarning(ex, $"{ex.Message}");
}
return Ok();
}
[MqttRoute("{devname}/attributes/binary/{keyname}")]
public Task attributes_binary(string devname, string keyname)
{
var device = _dev.JudgeOrCreateNewDevice(devname, _scopeFactor, _logger);
Dictionary<string, object> keyValues = new Dictionary<string, object>();
try
{
keyValues.Add(keyname, Message.Payload);
_queue.PublishAttributeData(device, keyValues);
}
catch (Exception ex)
{
_logger.LogWarning(ex, $"{ex.Message}");
}
return Ok();
}
// Supports template routing with typed constraints
[MqttRoute("{devname}/attributes")]
public Task attributes(string devname)
{
var device = _dev.JudgeOrCreateNewDevice(devname, _scopeFactor, _logger);
Dictionary<string, object> keyValues = new Dictionary<string, object>();
try
{
if (Message.Payload?.Length > 0)
{
keyValues = Message.ConvertPayloadToDictionary();
_queue.PublishAttributeData(device, keyValues);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, $"{ex.Message}");
}
return Ok();
}
[MqttRoute("{devname}/telemetry")]
public Task telemetry(string devname)
{
var device = _dev.JudgeOrCreateNewDevice(devname, _scopeFactor, _logger);
Dictionary<string, object> keyValues = new Dictionary<string, object>();
try
{
if (Message.Payload?.Length > 0)
{
keyValues = Message.ConvertPayloadToDictionary();
_queue.PublishAttributeData(device, keyValues);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, $"{ex.Message}");
}
return Ok();
}
[MqttRoute("{devname}/attributes/request/{keyname}/{requestid}/xml")]
public async Task RequestAttributes(string devname, string keyname, string requestid)
{
var device = _dev.JudgeOrCreateNewDevice(devname, _scopeFactor, _logger);
Dictionary<string, object> keyValues = new Dictionary<string, object>();
try
{
using (var scope = _scopeFactor.CreateScope())
using (var _dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var qf = from at in _dbContext.AttributeLatest where at.Type == DataType.XML && at.KeyName == keyname select at;
await qf.LoadAsync();
await Server.PublishAsync(ClientId, $"devices/me/attributes/response/{requestid}", qf.FirstOrDefault()?.Value_XML);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, $"{ex.Message}");
}
}
[MqttRoute("{devname}/attributes/request/{keyname}/{requestid}/binary")]
public async Task RequestAttributes_binary(string devname, string keyname, string requestid)
{
var device = _dev.JudgeOrCreateNewDevice(devname, _scopeFactor, _logger);
Dictionary<string, object> keyValues = new Dictionary<string, object>();
try
{
using (var scope = _scopeFactor.CreateScope())
using (var _dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var qf = from at in _dbContext.AttributeLatest where at.Type == DataType.Binary && at.KeyName == keyname select at;
await qf.LoadAsync();
await Server.PublishAsync(ClientId, $"devices/me/attributes/response/{requestid}", qf.FirstOrDefault()?.Value_Binary);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, $"{ex.Message}");
}
}
[MqttRoute("{devname}/attributes/request/{requestid}")]
public async Task RequestAttributes(string devname,string requestid)
{
var device = _dev.JudgeOrCreateNewDevice(devname, _scopeFactor, _logger);
Dictionary<string, object> keyValues = new Dictionary<string, object>();
try
{
using (var scope = _scopeFactor.CreateScope())
using (var _dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
Dictionary<string, object> reps = new Dictionary<string, object>();
var reqid = requestid;
List<AttributeLatest> datas = new List<AttributeLatest>();
foreach (var kx in keyValues)
{
var keys = kx.Value?.ToString().Split(',');
if (keys != null && keys.Length > 0)
{
if (Enum.TryParse(kx.Key, true, out DataSide ds))
{
var qf = from at in _dbContext.AttributeLatest where at.DeviceId == device.Id && keys.Contains(at.KeyName) select at;
await qf.LoadAsync();
if (ds == DataSide.AnySide)
{
datas.AddRange(await qf.ToArrayAsync());
}
else
{
var qx = from at in qf where at.DataSide == ds select at;
datas.AddRange(await qx.ToArrayAsync());
}
}
}
}
foreach (var item in datas)
{
switch (item.Type)
{
case DataType.Boolean:
reps.Add(item.KeyName, item.Value_Boolean);
break;
case DataType.String:
reps.Add(item.KeyName, item.Value_String);
break;
case DataType.Long:
reps.Add(item.KeyName, item.Value_Long);
break;
case DataType.Double:
reps.Add(item.KeyName, item.Value_Double);
break;
case DataType.Json:
reps.Add(item.KeyName, Newtonsoft.Json.Linq.JToken.Parse(item.Value_Json));
break;
case DataType.XML:
reps.Add(item.KeyName, item.Value_XML);
break;
case DataType.Binary:
reps.Add(item.KeyName, item.Value_Binary);
break;
case DataType.DateTime:
reps.Add(item.KeyName, item.Value_DateTime);
break;
default:
reps.Add(item.KeyName, item.Value_Json);
break;
}
await Server.PublishAsync(ClientId, $"devices/me/attributes/response/{requestid}", reps);
}
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, $"{ex.Message}");
}
}
}
}
......@@ -388,7 +388,7 @@ namespace IoTSharp.Services
return device;
}
private async Task RequestAttributes(string[] tpary, string senderClientId, Dictionary<string, object> keyValues, Device device)
internal async Task RequestAttributes(string[] tpary, string senderClientId, Dictionary<string, object> keyValues, Device device)
{
using (var scope = _scopeFactor.CreateScope())
using (var _dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
......
Subproject commit 276b7809d8859a38ff9ad51201901b9d6e503aca
Subproject commit 3f45daf0eb11f6267ef095fcdbe925a6b4731a01
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册