未验证 提交 2fa7200f 编写于 作者: C Christian 提交者: GitHub

1544 add underlying socketoption of tcp keepalive into the server option (#1606)

* Expose TCP keep alive options to options builder

* Update ReleaseNotes.md
上级 9f7d15e1
......@@ -3,3 +3,4 @@
* [Client] MQTTv5 features are now checked and an exception is thrown if they are used when using protocol version 3.1.1 and lower. These checks can be disabled in client options. (BREAKING CHANGE!).
* [Server] Exposed MQTT v5 sent properties from the affected client in _ClientDisconnectedAsync_ event.
* [Server] Fixed wrong client ID passed to _InterceptingUnsubscriptionEventArgs_ (#1631, thanks to @ghord).
* [Server] Exposed socket settings for TCP keep alive in TCP options (#1544).
......@@ -72,8 +72,8 @@ namespace MQTTnet.Implementations
get => _socket.GetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval) as int? ?? 0;
set => _socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, value);
#else
get { throw new NotSupportedException("TcpKeepAliveInterval requires at least net5.0."); }
set { throw new NotSupportedException("TcpKeepAliveInterval requires at least net5.0."); }
get { throw new NotSupportedException("TcpKeepAliveInterval requires at least netcoreapp3.0."); }
set { throw new NotSupportedException("TcpKeepAliveInterval requires at least netcoreapp3.0."); }
#endif
}
......@@ -83,8 +83,8 @@ namespace MQTTnet.Implementations
get => _socket.GetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount) as int? ?? 0;
set => _socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, value);
#else
get { throw new NotSupportedException("TcpKeepAliveRetryCount requires at least net5.0."); }
set { throw new NotSupportedException("TcpKeepAliveRetryCount requires at least net5.0."); }
get { throw new NotSupportedException("TcpKeepAliveRetryCount requires at least netcoreapp3.0."); }
set { throw new NotSupportedException("TcpKeepAliveRetryCount requires at least netcoreapp3.0."); }
#endif
}
......@@ -94,8 +94,8 @@ namespace MQTTnet.Implementations
get => _socket.GetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime) as int? ?? 0;
set => _socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, value);
#else
get { throw new NotSupportedException("TcpKeepAliveTime requires at least net5.0."); }
set { throw new NotSupportedException("TcpKeepAliveTime requires at least net5.0."); }
get { throw new NotSupportedException("TcpKeepAliveTime requires at least netcoreapp3.0."); }
set { throw new NotSupportedException("TcpKeepAliveTime requires at least netcoreapp3.0."); }
#endif
}
......@@ -155,7 +155,7 @@ namespace MQTTnet.Implementations
}
catch (ObjectDisposedException)
{
// This will happen when _socket.EndAccept gets called by Task library but the socket is already disposed.
// This will happen when _socket.EndAccept_ gets called by Task library but the socket is already disposed.
return null;
}
}
......@@ -181,16 +181,18 @@ namespace MQTTnet.Implementations
try
{
#if NET5_0_OR_GREATER
#if NETCOREAPP3_0_OR_GREATER
if (_networkStream != null)
{
await _networkStream.DisposeAsync().ConfigureAwait(false);
}
await _socket.ConnectAsync(host, port, cancellationToken).ConfigureAwait(false);
#else
_networkStream?.Dispose();
#endif
#if NET5_0_OR_GREATER
await _socket.ConnectAsync(host, port, cancellationToken).ConfigureAwait(false);
#else
// Workaround for: https://github.com/dotnet/corefx/issues/24430
using (cancellationToken.Register(_socketDisposeAction))
{
......@@ -219,7 +221,7 @@ namespace MQTTnet.Implementations
}
catch (ObjectDisposedException)
{
// This will happen when _socket.EndConnect gets called by Task library but the socket is already disposed.
// This will happen when _socket.EndConnect_ gets called by Task library but the socket is already disposed.
cancellationToken.ThrowIfCancellationRequested();
}
}
......@@ -246,38 +248,44 @@ namespace MQTTnet.Implementations
_socket.Listen(connectionBacklog);
}
#if NET452 || NET461
public async Task<int> ReceiveAsync(ArraySegment<byte> buffer, SocketFlags socketFlags)
{
try
{
#if NET452 || NET461
return await Task.Factory.FromAsync(SocketWrapper.BeginReceive, _socket.EndReceive, new SocketWrapper(_socket, buffer, socketFlags)).ConfigureAwait(false);
#else
return await _socket.ReceiveAsync(buffer, socketFlags).ConfigureAwait(false);
#endif
}
catch (ObjectDisposedException)
{
// This will happen when _socket.EndReceive gets called by Task library but the socket is already disposed.
// This will happen when _socket.EndReceive_ gets called by Task library but the socket is already disposed.
return -1;
}
}
#else
public Task<int> ReceiveAsync(ArraySegment<byte> buffer, SocketFlags socketFlags)
{
return _socket.ReceiveAsync(buffer, socketFlags);
}
#endif
#if NET452 || NET461
public async Task SendAsync(ArraySegment<byte> buffer, SocketFlags socketFlags)
{
try
{
#if NET452 || NET461
await Task.Factory.FromAsync(SocketWrapper.BeginSend, _socket.EndSend, new SocketWrapper(_socket, buffer, socketFlags)).ConfigureAwait(false);
#else
await _socket.SendAsync(buffer, socketFlags).ConfigureAwait(false);
#endif
}
catch (ObjectDisposedException)
{
// This will happen when _socket.EndConnect gets called by Task library but the socket is already disposed.
// This will happen when _socket.EndSend_ gets called by Task library but the socket is already disposed.
}
}
#else
public Task SendAsync(ArraySegment<byte> buffer, SocketFlags socketFlags)
{
return _socket.SendAsync(buffer, socketFlags);
}
#endif
#if NET452 || NET461
sealed class SocketWrapper
......
......@@ -86,6 +86,26 @@ namespace MQTTnet.Implementations
_socket.LingerState = _options.LingerState;
}
if (_options.KeepAlive.HasValue)
{
_socket.KeepAlive = _options.KeepAlive.Value;
}
if (_options.TcpKeepAliveInterval.HasValue)
{
_socket.TcpKeepAliveInterval = _options.TcpKeepAliveInterval.Value;
}
if (_options.TcpKeepAliveRetryCount.HasValue)
{
_socket.TcpKeepAliveInterval = _options.TcpKeepAliveRetryCount.Value;
}
if (_options.TcpKeepAliveTime.HasValue)
{
_socket.TcpKeepAliveTime = _options.TcpKeepAliveTime.Value;
}
_socket.Bind(_localEndPoint);
// Get the local endpoint back from the socket. The port may have changed.
......
......@@ -7,8 +7,6 @@ using System.Net;
using System.Net.Security;
using System.Security.Authentication;
using MQTTnet.Certificates;
using System.Threading.Tasks;
#if !WINDOWS_UWP
using System.Security.Cryptography.X509Certificates;
#endif
......@@ -19,17 +17,26 @@ namespace MQTTnet.Server
public class MqttServerOptionsBuilder
{
readonly MqttServerOptions _options = new MqttServerOptions();
public MqttServerOptionsBuilder WithConnectionBacklog(int value)
public MqttServerOptions Build()
{
_options.DefaultEndpointOptions.ConnectionBacklog = value;
_options.TlsEndpointOptions.ConnectionBacklog = value;
return _options;
}
#if !WINDOWS_UWP
public MqttServerOptionsBuilder WithClientCertificate(RemoteCertificateValidationCallback validationCallback = null, bool checkCertificateRevocation = false)
{
_options.TlsEndpointOptions.ClientCertificateRequired = true;
_options.TlsEndpointOptions.CheckCertificateRevocation = checkCertificateRevocation;
_options.TlsEndpointOptions.RemoteCertificateValidationCallback = validationCallback;
return this;
}
#endif
public MqttServerOptionsBuilder WithMaxPendingMessagesPerClient(int value)
public MqttServerOptionsBuilder WithConnectionBacklog(int value)
{
_options.MaxPendingMessagesPerClient = value;
_options.DefaultEndpointOptions.ConnectionBacklog = value;
_options.TlsEndpointOptions.ConnectionBacklog = value;
return this;
}
......@@ -45,12 +52,6 @@ namespace MQTTnet.Server
return this;
}
public MqttServerOptionsBuilder WithDefaultEndpointPort(int value)
{
_options.DefaultEndpointOptions.Port = value;
return this;
}
public MqttServerOptionsBuilder WithDefaultEndpointBoundIPAddress(IPAddress value)
{
_options.DefaultEndpointOptions.BoundInterNetworkAddress = value ?? IPAddress.Any;
......@@ -63,21 +64,21 @@ namespace MQTTnet.Server
return this;
}
public MqttServerOptionsBuilder WithoutDefaultEndpoint()
public MqttServerOptionsBuilder WithDefaultEndpointPort(int value)
{
_options.DefaultEndpointOptions.IsEnabled = false;
_options.DefaultEndpointOptions.Port = value;
return this;
}
public MqttServerOptionsBuilder WithEncryptedEndpoint()
public MqttServerOptionsBuilder WithDefaultEndpointReuseAddress()
{
_options.TlsEndpointOptions.IsEnabled = true;
_options.DefaultEndpointOptions.ReuseAddress = true;
return this;
}
public MqttServerOptionsBuilder WithEncryptedEndpointPort(int value)
public MqttServerOptionsBuilder WithEncryptedEndpoint()
{
_options.TlsEndpointOptions.Port = value;
_options.TlsEndpointOptions.IsEnabled = true;
return this;
}
......@@ -93,43 +94,36 @@ namespace MQTTnet.Server
return this;
}
#if !WINDOWS_UWP
public MqttServerOptionsBuilder WithEncryptionCertificate(byte[] value, IMqttServerCertificateCredentials credentials = null)
public MqttServerOptionsBuilder WithEncryptedEndpointPort(int value)
{
if (value == null) throw new ArgumentNullException(nameof(value));
_options.TlsEndpointOptions.CertificateProvider = new BlobCertificateProvider(value)
{
Password = credentials?.Password
};
_options.TlsEndpointOptions.Port = value;
return this;
}
public MqttServerOptionsBuilder WithEncryptionCertificate(X509Certificate2 certificate)
public MqttServerOptionsBuilder WithEncryptionSslProtocol(SslProtocols value)
{
if (certificate == null) throw new ArgumentNullException(nameof(certificate));
_options.TlsEndpointOptions.SslProtocol = value;
return this;
}
_options.TlsEndpointOptions.CertificateProvider = new X509CertificateProvider(certificate);
public MqttServerOptionsBuilder WithKeepAlive()
{
_options.DefaultEndpointOptions.KeepAlive = true;
_options.TlsEndpointOptions.KeepAlive = true;
return this;
}
#endif
public MqttServerOptionsBuilder WithEncryptionSslProtocol(SslProtocols value)
public MqttServerOptionsBuilder WithMaxPendingMessagesPerClient(int value)
{
_options.TlsEndpointOptions.SslProtocol = value;
_options.MaxPendingMessagesPerClient = value;
return this;
}
#if !WINDOWS_UWP
public MqttServerOptionsBuilder WithClientCertificate(RemoteCertificateValidationCallback validationCallback = null, bool checkCertificateRevocation = false)
public MqttServerOptionsBuilder WithoutDefaultEndpoint()
{
_options.TlsEndpointOptions.ClientCertificateRequired = true;
_options.TlsEndpointOptions.CheckCertificateRevocation = checkCertificateRevocation;
_options.TlsEndpointOptions.RemoteCertificateValidationCallback = validationCallback;
_options.DefaultEndpointOptions.IsEnabled = false;
return this;
}
#endif
public MqttServerOptionsBuilder WithoutEncryptedEndpoint()
{
......@@ -137,6 +131,12 @@ namespace MQTTnet.Server
return this;
}
public MqttServerOptionsBuilder WithPersistentSessions(bool value = true)
{
_options.EnablePersistentSessions = value;
return this;
}
#if !WINDOWS_UWP
public MqttServerOptionsBuilder WithRemoteCertificateValidationCallback(RemoteCertificateValidationCallback value)
{
......@@ -144,40 +144,25 @@ namespace MQTTnet.Server
return this;
}
#endif
// public MqttServerOptionsBuilder WithApplicationMessageInterceptor(IMqttServerApplicationMessageInterceptor value)
// {
// _options.ApplicationMessageInterceptor = value;
// return this;
// }
//
// public MqttServerOptionsBuilder WithApplicationMessageInterceptor(Action<InterceptingMqttClientPublishEventArgs> value)
// {
// _options.ApplicationMessageInterceptor = new MqttServerApplicationMessageInterceptorDelegate(value);
// return this;
// }
//
// public MqttServerOptionsBuilder WithApplicationMessageInterceptor(Func<InterceptingMqttClientPublishEventArgs, Task> value)
// {
// _options.ApplicationMessageInterceptor = new MqttServerApplicationMessageInterceptorDelegate(value);
// return this;
// }
// public MqttServerOptionsBuilder WithMultiThreadedApplicationMessageInterceptor(Action<InterceptingMqttClientPublishEventArgs> value)
// {
// _options.ApplicationMessageInterceptor = new MqttServerMultiThreadedApplicationMessageInterceptorDelegate(value);
// return this;
// }
//
// public MqttServerOptionsBuilder WithMultiThreadedApplicationMessageInterceptor(Func<InterceptingMqttClientPublishEventArgs, Task> value)
// {
// _options.ApplicationMessageInterceptor = new MqttServerMultiThreadedApplicationMessageInterceptorDelegate(value);
// return this;
// }
public MqttServerOptionsBuilder WithDefaultEndpointReuseAddress()
public MqttServerOptionsBuilder WithTcpKeepAliveInterval(int value)
{
_options.DefaultEndpointOptions.ReuseAddress = true;
_options.DefaultEndpointOptions.TcpKeepAliveInterval = value;
_options.TlsEndpointOptions.TcpKeepAliveInterval = value;
return this;
}
public MqttServerOptionsBuilder WithTcpKeepAliveRetryCount(int value)
{
_options.DefaultEndpointOptions.TcpKeepAliveRetryCount = value;
_options.TlsEndpointOptions.TcpKeepAliveRetryCount = value;
return this;
}
public MqttServerOptionsBuilder WithTcpKeepAliveTime(int value)
{
_options.DefaultEndpointOptions.TcpKeepAliveTime = value;
_options.TlsEndpointOptions.TcpKeepAliveTime = value;
return this;
}
......@@ -187,24 +172,32 @@ namespace MQTTnet.Server
return this;
}
public MqttServerOptionsBuilder WithPersistentSessions(bool value = true)
#if !WINDOWS_UWP
public MqttServerOptionsBuilder WithEncryptionCertificate(byte[] value, IMqttServerCertificateCredentials credentials = null)
{
_options.EnablePersistentSessions = value;
if (value == null)
{
throw new ArgumentNullException(nameof(value));
}
_options.TlsEndpointOptions.CertificateProvider = new BlobCertificateProvider(value)
{
Password = credentials?.Password
};
return this;
}
// /// <summary>
// /// Gets or sets the client ID which is used when publishing messages from the server directly.
// /// </summary>
// public MqttServerOptionsBuilder WithClientId(string value)
// {
// _options.ClientId = value;
// return this;
// }
public MqttServerOptions Build()
public MqttServerOptionsBuilder WithEncryptionCertificate(X509Certificate2 certificate)
{
return _options;
if (certificate == null)
{
throw new ArgumentNullException(nameof(certificate));
}
_options.TlsEndpointOptions.CertificateProvider = new X509CertificateProvider(certificate);
return this;
}
#endif
}
}
}
\ No newline at end of file
......@@ -17,6 +17,30 @@ namespace MQTTnet.Server
public bool NoDelay { get; set; } = true;
/// <summary>
/// Gets or sets whether the sockets keep alive feature should be used.
/// The value _null_ indicates that the OS and framework defaults should be used.
/// </summary>
public bool? KeepAlive { get; set; }
/// <summary>
/// Gets or sets the TCP keep alive interval.
/// The value _null_ indicates that the OS and framework defaults should be used.
/// </summary>
public int? TcpKeepAliveInterval { get; set; }
/// <summary>
/// Gets or sets the TCP keep alive retry count.
/// The value _null_ indicates that the OS and framework defaults should be used.
/// </summary>
public int? TcpKeepAliveRetryCount { get; set; }
/// <summary>
/// Gets or sets the TCP keep alive time.
/// The value _null_ indicates that the OS and framework defaults should be used.
/// </summary>
public int? TcpKeepAliveTime { get; set; }
public LingerOption LingerState { get; set; } = new LingerOption(true, 0);
#if WINDOWS_UWP
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册