提交 ed31c512 编写于 作者: C Christian

Code cleanup

上级 e01707d2
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
#if !WINDOWS_UWP
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Internal;
using MQTTnet.Certificates;
using MQTTnet.Server;
namespace MQTTnet.Implementations
{
public sealed class MqttTcpServerAdapter : IMqttServerAdapter
{
readonly List<MqttTcpServerListener> _listeners = new List<MqttTcpServerListener>();
CancellationTokenSource _cancellationTokenSource;
MqttServerOptions _serverOptions;
CancellationTokenSource _cancellationTokenSource;
public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; }
public bool TreatSocketOpeningErrorAsWarning { get; set; }
public void Dispose()
{
Cleanup();
}
public Task StartAsync(MqttServerOptions options, IMqttNetLogger logger)
{
if (_cancellationTokenSource != null) throw new InvalidOperationException("Server is already started.");
if (_cancellationTokenSource != null)
{
throw new InvalidOperationException("Server is already started.");
}
_serverOptions = options;
_cancellationTokenSource = new CancellationTokenSource();
if (options.DefaultEndpointOptions.IsEnabled)
......@@ -61,11 +67,6 @@ namespace MQTTnet.Implementations
return CompletedTask.Instance;
}
public void Dispose()
{
Cleanup();
}
void Cleanup()
{
try
......@@ -86,9 +87,19 @@ namespace MQTTnet.Implementations
}
}
void RegisterListeners(MqttServerTcpEndpointBaseOptions tcpEndpointOptions, IMqttNetLogger logger, CancellationToken cancellationToken)
Task OnClientAcceptedAsync(IMqttChannelAdapter channelAdapter)
{
var clientHandler = ClientHandler;
if (clientHandler == null)
{
return CompletedTask.Instance;
}
return clientHandler(channelAdapter);
}
void RegisterListeners(MqttServerTcpEndpointBaseOptions tcpEndpointOptions, IMqttNetLogger logger, CancellationToken cancellationToken)
{
if (!tcpEndpointOptions.BoundInterNetworkAddress.Equals(IPAddress.None))
{
var listenerV4 = new MqttTcpServerListener(AddressFamily.InterNetwork, _serverOptions, tcpEndpointOptions, logger)
......@@ -115,17 +126,6 @@ namespace MQTTnet.Implementations
}
}
}
Task OnClientAcceptedAsync(IMqttChannelAdapter channelAdapter)
{
var clientHandler = ClientHandler;
if (clientHandler == null)
{
return CompletedTask.Instance;
}
return clientHandler(channelAdapter);
}
}
}
#endif
\ No newline at end of file
......@@ -4,7 +4,6 @@
#if !WINDOWS_UWP
using MQTTnet.Adapter;
using MQTTnet.Certificates;
using MQTTnet.Diagnostics;
using MQTTnet.Formatter;
using MQTTnet.Internal;
......@@ -28,8 +27,7 @@ namespace MQTTnet.Implementations
readonly MqttServerOptions _serverOptions;
readonly MqttServerTcpEndpointBaseOptions _options;
readonly MqttServerTlsTcpEndpointOptions _tlsOptions;
X509Certificate2 _tlsCertificate;
CrossPlatformSocket _socket;
IPEndPoint _localEndPoint;
......@@ -48,7 +46,6 @@ namespace MQTTnet.Implementations
if (_options is MqttServerTlsTcpEndpointOptions tlsOptions)
{
_tlsOptions = tlsOptions;
_tlsCertificate = _tlsOptions.CertificateProvider.GetCertificate();
}
}
......@@ -66,7 +63,7 @@ namespace MQTTnet.Implementations
_localEndPoint = new IPEndPoint(boundIp, _options.Port);
_logger.Info("Starting TCP listener (Endpoint='{0}', TLS={1}).", _localEndPoint, _tlsCertificate != null);
_logger.Info("Starting TCP listener (Endpoint={0}, TLS={1})", _localEndPoint, _tlsOptions?.CertificateProvider != null);
_socket = new CrossPlatformSocket(_addressFamily);
......@@ -115,7 +112,7 @@ namespace MQTTnet.Implementations
_socket.Listen(_options.ConnectionBacklog);
_logger.Verbose("TCP listener started (Endpoint='{0}'.", _localEndPoint);
_logger.Verbose("TCP listener started (Endpoint={0})", _localEndPoint);
Task.Run(() => AcceptClientConnectionsAsync(cancellationToken), cancellationToken).RunInBackground(_logger);
......@@ -128,7 +125,7 @@ namespace MQTTnet.Implementations
throw;
}
_logger.Warning(exception, "Error while creating listener socket for local end point '{0}'.", _localEndPoint);
_logger.Warning(exception, "Error while starting TCP listener (Endpoint={0})", _localEndPoint);
return false;
}
}
......@@ -136,10 +133,6 @@ namespace MQTTnet.Implementations
public void Dispose()
{
_socket?.Dispose();
#if !NET452
_tlsCertificate?.Dispose();
#endif
}
async Task AcceptClientConnectionsAsync(CancellationToken cancellationToken)
......@@ -170,7 +163,7 @@ namespace MQTTnet.Implementations
}
}
_logger.Error(exception, "Error while accepting connection at TCP listener {0} TLS={1}.", _localEndPoint, _tlsCertificate != null);
_logger.Error(exception, "Error while accepting TCP connection (Endpoint={0})", _localEndPoint);
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false);
}
}
......@@ -185,31 +178,15 @@ namespace MQTTnet.Implementations
{
remoteEndPoint = clientSocket.RemoteEndPoint.ToString();
_logger.Verbose("Client '{0}' accepted by TCP listener '{1}, {2}'.",
remoteEndPoint,
_localEndPoint,
_addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6");
_logger.Verbose("TCP client '{0}' accepted (Local endpoint={1})", remoteEndPoint, _localEndPoint);
clientSocket.NoDelay = _options.NoDelay;
stream = clientSocket.GetStream();
X509Certificate2 clientCertificate = null;
if (_tlsOptions.CertificateProvider != null)
{
var tlsCertificate = _tlsOptions.CertificateProvider.GetCertificate();
if (tlsCertificate != _tlsCertificate)
{
var oldTlsCertificate = _tlsCertificate;
_tlsCertificate = tlsCertificate;
#if !NET452
oldTlsCertificate.Dispose();
#endif
}
}
if (_tlsCertificate != null)
var clientCertificate = _tlsOptions?.CertificateProvider?.GetCertificate();
if (clientCertificate != null)
{
if (!_tlsCertificate.HasPrivateKey)
if (!clientCertificate.HasPrivateKey)
{
throw new InvalidOperationException("The certificate for TLS encryption must contain the private key.");
}
......@@ -218,9 +195,9 @@ namespace MQTTnet.Implementations
#if NETCOREAPP3_1_OR_GREATER
await sslStream.AuthenticateAsServerAsync(
new SslServerAuthenticationOptions()
new SslServerAuthenticationOptions
{
ServerCertificate = _tlsCertificate,
ServerCertificate = clientCertificate,
ClientCertificateRequired = _tlsOptions.ClientCertificateRequired,
EnabledSslProtocols = _tlsOptions.SslProtocol,
CertificateRevocationCheckMode = _tlsOptions.CheckCertificateRevocation ? X509RevocationMode.Online : X509RevocationMode.NoCheck,
......@@ -229,7 +206,7 @@ namespace MQTTnet.Implementations
}).ConfigureAwait(false);
#else
await sslStream.AuthenticateAsServerAsync(
_tlsCertificate,
clientCertificate,
_tlsOptions.ClientCertificateRequired,
_tlsOptions.SslProtocol,
_tlsOptions.CheckCertificateRevocation).ConfigureAwait(false);
......@@ -272,7 +249,7 @@ namespace MQTTnet.Implementations
return;
}
_logger.Error(exception, "Error while handling client connection.");
_logger.Error(exception, "Error while handling TCP client connection");
}
finally
{
......@@ -287,10 +264,7 @@ namespace MQTTnet.Implementations
}
}
_logger.Verbose("Client '{0}' disconnected at TCP listener '{1}, {2}'.",
remoteEndPoint,
_localEndPoint,
_addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6");
_logger.Verbose("TCP client '{0}' disconnected (Local endpoint={1})", remoteEndPoint, _localEndPoint);
}
}
}
......
......@@ -407,13 +407,13 @@ namespace MQTTnet.Server
{
// This is a MQTT V5+ feature.
return;
}
}
if (createSubscriptionResult.Subscription.RetainHandling == MqttRetainHandling.SendAtSubscribeIfNewSubscriptionOnly && !createSubscriptionResult.IsNewSubscription)
{
// This is a MQTT V5+ feature.
{
// This is a MQTT V5+ feature.
return;
}
}
for (var index = retainedMessages.Count - 1; index >= 0; index--)
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册