提交 4098d587 编写于 作者: 麦壳饼's avatar 麦壳饼

add built-in mqtt broker and mqtt client

上级 ce13ecc1
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Authentication;
using System.Threading.Tasks;
namespace IoTSharp.Hub
......@@ -11,5 +12,33 @@ namespace IoTSharp.Hub
public string JwtKey { get; set; }
public string JwtExpireDays { get; set; }
public string JwtIssuer { get; set; }
/// <summary>
/// Broker settings
/// </summary>
public MqttBrokerSetting MqttBroker { get; set; }
/// <summary>
/// mqtt client settings
/// </summary>
public MqttClientSetting MqttClient { get; set; }
}
public class MqttClientSetting
{
/// <summary>
/// built-in or IP、HostName
/// </summary>
public string MqttBroker { get; set; } = "built-in";
public string UserName { get; set; } = "root";
public string Password { get; set; } = "kissme";
public int Port { get; set; } = 1883;
}
public class MqttBrokerSetting
{
public int Port { get; set; } = 1883;
public int TlsPort { get; set; } = 8883;
public bool EnableTls { get; set; } = false;
public string Certificate { get; set; }
public SslProtocols SslProtocol { get; set; } = SslProtocols.None;
}
}
......@@ -23,63 +23,23 @@ namespace IoTSharp.Hub
case "mssql":
services.AddEntityFrameworkSqlServer();
services.AddDbContext<ApplicationDbContext>(options => options.UseSqlServer(_ConnectionString), ServiceLifetime.Transient);
#if WithHealthChecks
services.AddHealthChecks().AddSqlServer(_ConnectionString, name: "database").AddMQTTChatHealthChecks();
#endif
break;
case "npgsql":
services.AddEntityFrameworkNpgsql();
services.AddDbContext<ApplicationDbContext>(options => options.UseNpgsql(_ConnectionString), ServiceLifetime.Transient);
#if WithHealthChecks
services.AddHealthChecks().AddNpgSql(_ConnectionString, name: "database").AddMQTTChatHealthChecks();
#endif
break;
case "memory":
services.AddEntityFrameworkInMemoryDatabase();
services.AddDbContext<ApplicationDbContext>(options => options.UseInMemoryDatabase(nameof(ApplicationDbContext)), ServiceLifetime.Transient);
#if WithHealthChecks
services.AddHealthChecks().AddMQTTChatHealthChecks();
#endif
break;
case "sqlite":
default:
services.AddEntityFrameworkSqlite();
services.AddDbContext<ApplicationDbContext>(options => options.UseSqlite(_ConnectionString), ServiceLifetime.Transient);
#if WithHealthChecks
services.AddHealthChecks().AddSqlite(_ConnectionString, name: "database").AddMQTTChatHealthChecks();
#endif
break;
}
#if WithHealthChecks
services.AddHealthChecksUI();
#endif
}
#if WithHealthChecks
internal static void UseIotSharpHealthChecks(this IApplicationBuilder app)
{
app.UseHealthChecksUI();
app.UseHealthChecks("/health", new HealthCheckOptions()
{
Predicate = _ => true,
ResponseWriter = UIResponseWriter.WriteHealthCheckUIResponse
});
}
private static void AddMQTTChatHealthChecks(this IHealthChecksBuilder builder)
{
builder.AddPrivateMemoryHealthCheck(1024 * 1024 * 1024, "privatememory")
.AddDiskStorageHealthCheck(setup =>
{
DriveInfo.GetDrives().ToList().ForEach(di =>
{
setup.AddDrive(di.Name, 1024);
});
});
}
#endif
private static string GetFullPathName(string filename)
{
FileInfo fi = new FileInfo(System.IO.Path.Combine(
......@@ -89,36 +49,24 @@ namespace IoTSharp.Hub
return fi.FullName;
}
public static void UseMqttLogger(this IApplicationBuilder app)
internal static void UseSwagger(this IApplicationBuilder app)
{
var mqttNetLogger = app.ApplicationServices.GetService<IMqttNetLogger>();
var _loggerFactory = app.ApplicationServices.GetService<ILoggerFactory>();
var logger = _loggerFactory.CreateLogger<IMqttNetLogger>();
mqttNetLogger.LogMessagePublished += (object sender, MqttNetLogMessagePublishedEventArgs e) =>
app.UseSwaggerUi3();
app.UseSwagger(config => config.PostProcess = (document, request) =>
{
var message = $"ID:{e.TraceMessage.LogId},ThreadId:{e.TraceMessage.ThreadId},Source:{e.TraceMessage.Source},Timestamp:{e.TraceMessage.Timestamp},Message:{e.TraceMessage.Message}";
switch (e.TraceMessage.Level)
if (request.Headers.ContainsKey("X-External-Host"))
{
case MqttNetLogLevel.Verbose:
logger.LogTrace(e.TraceMessage.Exception, message);
break;
case MqttNetLogLevel.Info:
logger.LogInformation(e.TraceMessage.Exception, message);
break;
case MqttNetLogLevel.Warning:
logger.LogWarning(e.TraceMessage.Exception, message);
break;
case MqttNetLogLevel.Error:
logger.LogError(e.TraceMessage.Exception, message);
break;
default:
break;
// Change document server settings to public
document.Host = request.Headers["X-External-Host"].First();
document.BasePath = request.Headers["X-External-Path"].First();
}
};
});
app.UseSwaggerUi3(config => config.TransformToExternalPath = (internalUiRoute, request) =>
{
// The header X-External-Path is set in the nginx.conf file
var externalPath = request.Headers.ContainsKey("X-External-Path") ? request.Headers["X-External-Path"].First() : "";
return externalPath + internalUiRoute;
});
}
}
}
\ No newline at end of file
using Microsoft.Extensions.Logging;
using MQTTnet.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.AspNetCore.Builder;
using MQTTnet.AspNetCore;
using MQTTnet.Diagnostics;
namespace IoTSharp.Hub
{
public static class MqttExtension
{
public static void AddMqttClient(this IServiceCollection services, MqttClientSetting setting)
{
if (setting == null) setting = new MqttClientSetting();
services.AddSingleton(options => new MQTTnet.MqttFactory().CreateMqttClient());
services.AddTransient(options => new MqttClientOptionsBuilder()
.WithClientId("buind-in")
.WithTcpServer((setting.MqttBroker == "built-in" || string.IsNullOrEmpty(setting.MqttBroker)) ? "127.0.0.1" : setting.MqttBroker, setting.Port)
.WithCredentials(setting.UserName, setting.Password)
.WithCleanSession()
.Build());
}
public static void UseIoTSharpMqttClient(this IApplicationBuilder app)
{
var _logger = app.ApplicationServices.GetService<ILoggerFactory>().CreateLogger<MqttClient>();
var mqtt = app.ApplicationServices.GetService<IMqttClient>();
var clientOptions = app.ApplicationServices.GetService<IMqttClientOptions>();
mqtt.ApplicationMessageReceived += (sender, e) =>
{
_logger.LogInformation($"Received : {e.ApplicationMessage.Topic}");
};
mqtt.Connected += (sender, e) =>
{
_logger.LogInformation($"CONNECTED IsSessionPresent: {e.IsSessionPresent}");
};
mqtt.Disconnected += async (s, e) =>
{
_logger.LogInformation($"DISCONNECTED FROM SERVER ClientWasConnected:{e.ClientWasConnected}, Exception={ e.Exception.Message}");
await Task.Delay(TimeSpan.FromSeconds(5));
try
{
await mqtt.ConnectAsync(clientOptions);
_logger.LogInformation("RECONNECT AGAIN");
}
catch (Exception exception)
{
_logger.LogError("CONNECTING FAILED", exception);
}
};
try
{
Task.Run(() =>
{
mqtt.ConnectAsync(clientOptions);
_logger.LogInformation("CONNECTED");
});
}
catch (Exception exception)
{
_logger.LogError("CONNECTING FAILED", exception);
}
}
public static void AddIoTSharpMqttServer( this IServiceCollection services,MqttBrokerSetting setting)
{
services.AddMqttTcpServerAdapter();
services.AddHostedMqttServer(options =>
{
var broker = setting;
if (broker == null) broker = new MqttBrokerSetting();
options.WithDefaultEndpointPort(broker.Port).WithDefaultEndpointBoundIPAddress(System.Net.IPAddress.Parse("127.0.0.1")).WithDefaultEndpoint();
if (broker.EnableTls)
{
options.WithEncryptedEndpoint();
options.WithEncryptedEndpointPort(broker.TlsPort);
if (System.IO.File.Exists(broker.Certificate))
{
options.WithEncryptionCertificate(System.IO.File.ReadAllBytes(broker.Certificate)).WithEncryptionSslProtocol(broker.SslProtocol);
}
}
else
{
options.WithoutEncryptedEndpoint();
}
options.WithConnectionValidator(action =>
{
action.ReturnCode = MQTTnet.Protocol.MqttConnectReturnCode.ConnectionAccepted;
});
options.Build();
});
services.AddMqttConnectionHandler();
services.AddMqttWebSocketServerAdapter();
}
public static void UseIotSharpMqttServer(this IApplicationBuilder app)
{
app.UseMqttEndpoint();
app.UseMqttServer(server =>
{
server.ClientConnected += (sender, e) => { };
server.Started += (sender, e) => { };
server.Stopped += (sender, e) => { };
server.ApplicationMessageReceived += (sender, e) => { };
server.ClientSubscribedTopic += (sender, e) => { };
server.ClientUnsubscribedTopic += (sender, e) => { };
});
var mqttNetLogger = app.ApplicationServices.GetService<IMqttNetLogger>();
var _loggerFactory = app.ApplicationServices.GetService<ILoggerFactory>();
var logger = _loggerFactory.CreateLogger<IMqttNetLogger>();
mqttNetLogger.LogMessagePublished += (object sender, MqttNetLogMessagePublishedEventArgs e) =>
{
var message = $"ID:{e.TraceMessage.LogId},ThreadId:{e.TraceMessage.ThreadId},Source:{e.TraceMessage.Source},Timestamp:{e.TraceMessage.Timestamp},Message:{e.TraceMessage.Message}";
switch (e.TraceMessage.Level)
{
case MqttNetLogLevel.Verbose:
logger.LogTrace(e.TraceMessage.Exception, message);
break;
case MqttNetLogLevel.Info:
logger.LogInformation(e.TraceMessage.Exception, message);
break;
case MqttNetLogLevel.Warning:
logger.LogWarning(e.TraceMessage.Exception, message);
break;
case MqttNetLogLevel.Error:
logger.LogError(e.TraceMessage.Exception, message);
break;
default:
break;
}
};
}
}
}
......@@ -25,10 +25,6 @@
<None Remove="Migrations\**" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.NpgSql" Version="2.2.0" />
<PackageReference Include="AspNetCore.HealthChecks.System" Version="2.2.1" />
<PackageReference Include="AspNetCore.HealthChecks.UI" Version="2.2.10" />
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="2.2.2" />
<PackageReference Include="IoTSharp.X509Extensions" Version="1.3.3" />
<PackageReference Include="LiteDB" Version="4.1.4" />
<PackageReference Include="Microsoft.AspNetCore.App" />
......@@ -44,10 +40,15 @@
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite.Design" Version="1.1.6" />
<PackageReference Include="Microsoft.VisualStudio.Web.CodeGeneration.Design" Version="2.2.1" />
<PackageReference Include="MQTTnet" Version="2.8.5" />
<PackageReference Include="MQTTnet.AspNetCore" Version="2.8.5" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="2.2.0" />
<PackageReference Include="NSwag.AspNetCore" Version="12.0.13" />
<PackageReference Include="QuartzHostedService" Version="0.0.4" />
<PackageReference Include="System.Text.Encoding.CodePages" Version="4.5.1" />
</ItemGroup>
<ProjectExtensions><VisualStudio><UserProperties appsettings_1Development_1json__JSONSchema="http://json.schemastore.org/bungee-plugin" /></VisualStudio></ProjectExtensions>
<ProjectExtensions>
<VisualStudio>
<UserProperties appsettings_1Development_1json__JSONSchema="http://json.schemastore.org/bungee-plugin" />
</VisualStudio>
</ProjectExtensions>
</Project>
\ No newline at end of file
......@@ -10,12 +10,15 @@ using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using MQTTnet.AspNetCore;
using MQTTnet.Client;
using NSwag.AspNetCore;
using System;
using System.IdentityModel.Tokens.Jwt;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace IoTSharp.Hub
{
......@@ -68,8 +71,14 @@ namespace IoTSharp.Hub
});
services.AddTransient<ApplicationDBInitializer>();
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);
services.AddIoTSharpMqttServer(AppSettings.MqttBroker);
services.AddMqttClient(AppSettings.MqttClient);
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
......@@ -91,30 +100,17 @@ namespace IoTSharp.Hub
app.UseStaticFiles();
app.UseHttpsRedirection();
app.UseStaticFiles();
app.UseSwagger();
app.UseIoTSharpMqttClient();
app.UseIotSharpMqttServer();
app.UseForwardedHeaders(new ForwardedHeadersOptions
{
ForwardedHeaders = ForwardedHeaders.XForwardedFor | ForwardedHeaders.XForwardedProto
});
app.UseSwaggerUi3();
app.UseSwagger(config => config.PostProcess = (document, request) =>
{
if (request.Headers.ContainsKey("X-External-Host"))
{
// Change document server settings to public
document.Host = request.Headers["X-External-Host"].First();
document.BasePath = request.Headers["X-External-Path"].First();
}
});
app.UseSwaggerUi3(config => config.TransformToExternalPath = (internalUiRoute, request) =>
{
// The header X-External-Path is set in the nginx.conf file
var externalPath = request.Headers.ContainsKey("X-External-Path") ? request.Headers["X-External-Path"].First() : "";
return externalPath + internalUiRoute;
});
#if WithHealthChecks
app.UseIotSharpHealthChecks();
#endif
}
}
}
\ No newline at end of file
......@@ -12,4 +12,5 @@
"JwtExpireDays": 1,
"JwtIssuer": "IoTSharp.Hub",
"AllowedHosts": "*"
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册