未验证 提交 36c6d897 编写于 作者: C Christian 提交者: GitHub

Allow disabling of packet fragmentation (#1709)

* Add compatibility option for AWS

* Fix UWP build

* Fix comment in MqttClientOptions (#1706)

* Refactor naming

* Fix wrong option value

---------
Co-authored-by: NMichi <michael@volz.dev>
上级 d6cab767
......@@ -3,6 +3,7 @@
* [Client] Added support for passing MQTT v5 options (User properties etc.) for disconnects.
* [Client] An internal exception is no longer caught silently when calling _DisconnectAsync_ to indicate that the disconnect is not clean (BREAKING CHANGE).
* [Client] MQTTv5 features are now checked and an exception is thrown if they are used when using protocol version 3.1.1 and lower. These checks can be disabled in client options. (BREAKING CHANGE!).
* [Client] Added support for disabling packet fragmentation (required for i.e. AWS, #1690, thanks to @logicaloud).
* [Server] Exposed MQTT v5 sent properties from the affected client in _ClientDisconnectedAsync_ event.
* [Server] Fixed wrong client ID passed to _InterceptingUnsubscriptionEventArgs_ (#1631, thanks to @ghord).
* [Server] Exposed socket settings for TCP keep alive in TCP options (#1544).
......
......@@ -70,6 +70,32 @@ public static class Client_Connection_Samples
await mqttClient.DisconnectAsync(mqttClientDisconnectOptions, CancellationToken.None);
}
}
public static async Task Connect_With_Amazon_AWS()
{
/*
* This sample creates a simple MQTT client and connects to an Amazon Web Services broker.
*
* The broker requires special settings which are set here.
*/
var mqttFactory = new MqttFactory();
using (var mqttClient = mqttFactory.CreateMqttClient())
{
var mqttClientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("amazon.web.services.broker")
// Disabling packet fragmentation is very important!
.WithoutPacketFragmentation()
.Build();
await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
Console.WriteLine("The MQTT client is connected.");
await mqttClient.DisconnectAsync();
}
}
public static async Task Connect_Client_Timeout()
{
......
......@@ -48,7 +48,7 @@ namespace MQTTnet.AspNetCore
var formatter = new MqttPacketFormatterAdapter(new MqttBufferWriter(4096, 65535));
var channel = new MqttWebSocketChannel(webSocket, endpoint, isSecureConnection, clientCertificate);
using (var channelAdapter = new MqttChannelAdapter(channel, formatter, null, _logger))
using (var channelAdapter = new MqttChannelAdapter(channel, formatter, _logger))
{
await clientHandler(channelAdapter).ConfigureAwait(false);
}
......
......@@ -73,7 +73,7 @@ namespace MQTTnet.Benchmarks
var channel = new MemoryMqttChannel(_stream);
_channelAdapter = new MqttChannelAdapter(channel, serializer, null, new MqttNetEventLogger());
_channelAdapter = new MqttChannelAdapter(channel, serializer, new MqttNetEventLogger());
}
static byte[] Join(params ArraySegment<byte>[] chunks)
......
......@@ -54,7 +54,7 @@ namespace MQTTnet.Benchmarks
public void Deserialize_10000_Messages()
{
var channel = new BenchmarkMqttChannel(_serializedPacket);
var reader = new MqttChannelAdapter(channel, new MqttPacketFormatterAdapter(new MqttBufferWriter(4096, 65535)), null, new MqttNetEventLogger());
var reader = new MqttChannelAdapter(channel, new MqttPacketFormatterAdapter(new MqttBufferWriter(4096, 65535)), new MqttNetEventLogger());
for (var i = 0; i < 10000; i++)
{
......
......@@ -24,7 +24,7 @@ namespace MQTTnet.Extensions.ManagedClient
/// <summary>
/// Defines the maximum amount of topic filters which will be sent in a SUBSCRIBE/UNSUBSCRIBE packet.
/// Amazon AWS limits this number to 8. The default is int.MaxValue.
/// Amazon Web Services (AWS) limits this number to 8. The default is int.MaxValue.
/// </summary>
public int MaxTopicFiltersInSubscribeUnsubscribePackets { get; set; } = int.MaxValue;
}
......
......@@ -27,8 +27,10 @@ namespace MQTTnet.Extensions.WebSocket4Net
return new MqttChannelAdapter(
new MqttTcpChannel(options),
new MqttPacketFormatterAdapter(options.ProtocolVersion, new MqttBufferWriter(options.WriterBufferSize, options.WriterBufferSizeMax)),
packetInspector,
logger);
logger)
{
PacketInspector = packetInspector
};
}
case MqttClientWebSocketOptions webSocketOptions:
......@@ -36,8 +38,10 @@ namespace MQTTnet.Extensions.WebSocket4Net
return new MqttChannelAdapter(
new WebSocket4NetMqttChannel(options, webSocketOptions),
new MqttPacketFormatterAdapter(options.ProtocolVersion, new MqttBufferWriter(options.WriterBufferSize, options.WriterBufferSizeMax)),
packetInspector,
logger);
logger)
{
PacketInspector = packetInspector
};
}
default:
......
......@@ -566,7 +566,6 @@ namespace MQTTnet.Tests.Formatter
using (var adapter = new MqttChannelAdapter(
channel,
new MqttPacketFormatterAdapter(protocolVersion, new MqttBufferWriter(4096, 65535)),
null,
new MqttNetEventLogger()))
{
var receivedPacket = adapter.ReceivePacketAsync(CancellationToken.None).GetAwaiter().GetResult();
......@@ -582,19 +581,12 @@ namespace MQTTnet.Tests.Formatter
MqttProtocolVersion DeserializeAndDetectVersion(MqttPacketFormatterAdapter packetFormatterAdapter, byte[] buffer)
{
var channel = new MemoryMqttChannel(buffer);
var adapter = new MqttChannelAdapter(channel, packetFormatterAdapter, null, new MqttNetEventLogger());
var adapter = new MqttChannelAdapter(channel, packetFormatterAdapter, new MqttNetEventLogger());
adapter.ReceivePacketAsync(CancellationToken.None).GetAwaiter().GetResult();
return packetFormatterAdapter.ProtocolVersion;
}
MqttBufferReader ReaderFactory(byte[] data)
{
var reader = new MqttBufferReader();
reader.SetBuffer(data, 0, data.Length);
return reader;
}
TPacket Roundtrip<TPacket>(TPacket packet, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311, MqttBufferWriter bufferWriter = null) where TPacket : MqttPacket
{
var writer = bufferWriter ?? WriterFactory();
......@@ -603,7 +595,7 @@ namespace MQTTnet.Tests.Formatter
using (var channel = new MemoryMqttChannel(buffer.Join().ToArray()))
{
var adapter = new MqttChannelAdapter(channel, new MqttPacketFormatterAdapter(protocolVersion, new MqttBufferWriter(4096, 65535)), null, new MqttNetEventLogger());
var adapter = new MqttChannelAdapter(channel, new MqttPacketFormatterAdapter(protocolVersion, new MqttBufferWriter(4096, 65535)), new MqttNetEventLogger());
return (TPacket)adapter.ReceivePacketAsync(CancellationToken.None).GetAwaiter().GetResult();
}
}
......
......@@ -31,7 +31,7 @@ namespace MQTTnet.Tests
{
var formatterAdapter = new MqttPacketFormatterAdapter(_protocolVersion, new MqttBufferWriter(4096, 65535));
var adapter = new MqttChannelAdapter(channel, formatterAdapter, null, MqttNetNullLogger.Instance);
var adapter = new MqttChannelAdapter(channel, formatterAdapter, MqttNetNullLogger.Instance);
return adapter.ReceivePacketAsync(CancellationToken.None).GetAwaiter().GetResult();
}
}
......
......@@ -27,18 +27,15 @@ namespace MQTTnet.Adapter
readonly byte[] _fixedHeaderBuffer = new byte[2];
readonly MqttNetSourceLogger _logger;
readonly MqttPacketInspector _packetInspector;
readonly byte[] _singleByteBuffer = new byte[1];
readonly AsyncLock _syncRoot = new AsyncLock();
Statistics _statistics; // mutable struct, don't make readonly!
Statistics _statistics; // mutable struct, don't make readonly!
public MqttChannelAdapter(IMqttChannel channel, MqttPacketFormatterAdapter packetFormatterAdapter, MqttPacketInspector packetInspector, IMqttNetLogger logger)
public MqttChannelAdapter(IMqttChannel channel, MqttPacketFormatterAdapter packetFormatterAdapter, IMqttNetLogger logger)
{
_channel = channel ?? throw new ArgumentNullException(nameof(channel));
_packetInspector = packetInspector;
PacketFormatterAdapter = packetFormatterAdapter ?? throw new ArgumentNullException(nameof(packetFormatterAdapter));
......@@ -64,6 +61,10 @@ namespace MQTTnet.Adapter
public MqttPacketFormatterAdapter PacketFormatterAdapter { get; }
public MqttPacketInspector PacketInspector { get; set; }
public bool AllowPacketFragmentation { get; set; } = true;
public async Task ConnectAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
......@@ -126,7 +127,7 @@ namespace MQTTnet.Adapter
try
{
_packetInspector?.BeginReceivePacket();
PacketInspector?.BeginReceivePacket();
ReceivedMqttPacket receivedPacket;
var receivedPacketTask = ReceiveAsync(cancellationToken);
......@@ -144,7 +145,7 @@ namespace MQTTnet.Adapter
return null;
}
_packetInspector?.EndReceivePacket();
PacketInspector?.EndReceivePacket();
Interlocked.Add(ref _statistics._bytesSent, receivedPacket.TotalLength);
......@@ -180,7 +181,10 @@ namespace MQTTnet.Adapter
return null;
}
public void ResetStatistics() => _statistics.Reset();
public void ResetStatistics()
{
_statistics.Reset();
}
public async Task SendPacketAsync(MqttPacket packet, CancellationToken cancellationToken)
{
......@@ -197,18 +201,18 @@ namespace MQTTnet.Adapter
try
{
var packetBuffer = PacketFormatterAdapter.Encode(packet);
_packetInspector?.BeginSendPacket(packetBuffer);
PacketInspector?.BeginSendPacket(packetBuffer);
_logger.Verbose("TX ({0} bytes) >>> {1}", packetBuffer.Length, packet);
if (packetBuffer.Payload.Count > 0)
if (packetBuffer.Payload.Count == 0 || !AllowPacketFragmentation)
{
await _channel.WriteAsync(packetBuffer.Packet, false, cancellationToken).ConfigureAwait(false);
await _channel.WriteAsync(packetBuffer.Payload, true, cancellationToken).ConfigureAwait(false);
await _channel.WriteAsync(packetBuffer.Join(), true, cancellationToken).ConfigureAwait(false);
}
else
{
await _channel.WriteAsync(packetBuffer.Packet, true, cancellationToken).ConfigureAwait(false);
await _channel.WriteAsync(packetBuffer.Packet, false, cancellationToken).ConfigureAwait(false);
await _channel.WriteAsync(packetBuffer.Payload, true, cancellationToken).ConfigureAwait(false);
}
Interlocked.Add(ref _statistics._bytesReceived, packetBuffer.Length);
......@@ -270,7 +274,7 @@ namespace MQTTnet.Adapter
return 0;
}
_packetInspector?.FillReceiveBuffer(_singleByteBuffer);
PacketInspector?.FillReceiveBuffer(_singleByteBuffer);
encodedByte = _singleByteBuffer[0];
......@@ -309,7 +313,7 @@ namespace MQTTnet.Adapter
{
return ReadFixedHeaderResult.ConnectionClosed;
}
if (cancellationToken.IsCancellationRequested)
{
return ReadFixedHeaderResult.Canceled;
......@@ -323,7 +327,7 @@ namespace MQTTnet.Adapter
totalBytesRead += bytesRead;
}
_packetInspector?.FillReceiveBuffer(buffer);
PacketInspector?.FillReceiveBuffer(buffer);
var hasRemainingLength = buffer[1] != 0;
if (!hasRemainingLength)
......@@ -407,7 +411,7 @@ namespace MQTTnet.Adapter
bodyOffset += readBytes;
} while (bodyOffset < bodyLength);
_packetInspector?.FillReceiveBuffer(body);
PacketInspector?.FillReceiveBuffer(body);
var bodySegment = new ArraySegment<byte>(body, 0, bodyLength);
return new ReceivedMqttPacket(fixedHeader.Flags, bodySegment, fixedHeader.TotalLength);
......@@ -455,7 +459,7 @@ namespace MQTTnet.Adapter
throw new MqttCommunicationException(exception);
}
private struct Statistics
struct Statistics
{
public long _bytesReceived;
public long _bytesSent;
......
......@@ -24,6 +24,14 @@ namespace MQTTnet.Client
/// </summary>
public string AuthenticationMethod { get; set; }
/// <summary>
/// Usually the MQTT packets can be send partially. This is done by using multiple TCP packets
/// or WebSocket frames etc. Unfortunately not all brokers (like Amazon Web Services (AWS)) do support this feature and
/// will close the connection when receiving such packets. If such a service is used this flag must
/// be set to _false_.
/// </summary>
public bool AllowPacketFragmentation { get; set; } = true;
public IMqttClientChannelOptions ChannelOptions { get; set; }
/// <summary>
......@@ -214,4 +222,4 @@ namespace MQTTnet.Client
/// </summary>
public int WriterBufferSizeMax { get; set; } = 65535;
}
}
\ No newline at end of file
}
......@@ -75,10 +75,22 @@ namespace MQTTnet.Client
_options.ChannelOptions = (IMqttClientChannelOptions)_tcpOptions ?? _webSocketOptions;
MqttClientOptionsValidator.ThrowIfNotSupported(_options);
return _options;
}
/// <summary>
/// Usually the MQTT packets can be send partially. This is done by using multiple TCP packets
/// or WebSocket frames etc. Unfortunately not all brokers (like Amazon Web Services (AWS)) do support this feature and
/// will close the connection when receiving such packets. If such a service is used this flag must
/// be set to _true_.
/// </summary>
public MqttClientOptionsBuilder WithoutPacketFragmentation()
{
_options.AllowPacketFragmentation = false;
return this;
}
public MqttClientOptionsBuilder WithAuthentication(string method, byte[] data)
{
_options.AuthenticationMethod = method;
......@@ -266,7 +278,7 @@ namespace MQTTnet.Client
return this;
}
public MqttClientOptionsBuilder WithTcpServer(Action<MqttClientTcpOptions> optionsBuilder)
{
if (optionsBuilder == null)
......@@ -373,6 +385,18 @@ namespace MQTTnet.Client
return this;
}
public MqttClientOptionsBuilder WithWillContentType(string willContentType)
{
_options.WillContentType = willContentType;
return this;
}
public MqttClientOptionsBuilder WithWillCorrelationData(byte[] willCorrelationData)
{
_options.WillCorrelationData = willCorrelationData;
return this;
}
public MqttClientOptionsBuilder WithWillDelayInterval(uint willDelayInterval)
{
_options.WillDelayInterval = willDelayInterval;
......@@ -384,27 +408,33 @@ namespace MQTTnet.Client
_options.WillPayload = willPayload;
return this;
}
public MqttClientOptionsBuilder WithWillPayload(string willPayload)
{
if (string.IsNullOrEmpty(willPayload))
{
return WithWillPayload((byte[])null);
}
_options.WillPayload = Encoding.UTF8.GetBytes(willPayload);
return this;
}
public MqttClientOptionsBuilder WithWillPayloadFormatIndicator(MqttPayloadFormatIndicator willPayloadFormatIndicator)
{
_options.WillPayloadFormatIndicator = willPayloadFormatIndicator;
return this;
}
public MqttClientOptionsBuilder WithWillQualityOfServiceLevel(MqttQualityOfServiceLevel willQualityOfServiceLevel)
{
_options.WillQualityOfServiceLevel = willQualityOfServiceLevel;
return this;
}
public MqttClientOptionsBuilder WithWillTopic(string willTopic)
public MqttClientOptionsBuilder WithWillResponseTopic(string willResponseTopic)
{
_options.WillTopic = willTopic;
_options.WillResponseTopic = willResponseTopic;
return this;
}
......@@ -413,38 +443,20 @@ namespace MQTTnet.Client
_options.WillRetain = willRetain;
return this;
}
public MqttClientOptionsBuilder WithWillContentType(string willContentType)
{
_options.WillContentType = willContentType;
return this;
}
public MqttClientOptionsBuilder WithWillCorrelationData(byte[] willCorrelationData)
{
_options.WillCorrelationData = willCorrelationData;
return this;
}
public MqttClientOptionsBuilder WithWillResponseTopic(string willResponseTopic)
{
_options.WillResponseTopic = willResponseTopic;
return this;
}
public MqttClientOptionsBuilder WithWillPayloadFormatIndicator(MqttPayloadFormatIndicator willPayloadFormatIndicator)
public MqttClientOptionsBuilder WithWillTopic(string willTopic)
{
_options.WillPayloadFormatIndicator = willPayloadFormatIndicator;
_options.WillTopic = willTopic;
return this;
}
public MqttClientOptionsBuilder WithWillUserProperty(string name, string value)
{
if (_options.WillUserProperties == null)
{
_options.WillUserProperties = new List<MqttUserProperty>();
}
_options.WillUserProperties.Add(new MqttUserProperty(name, value));
return this;
}
......
......@@ -40,7 +40,12 @@ namespace MQTTnet.Implementations
var bufferWriter = new MqttBufferWriter(options.WriterBufferSize, options.WriterBufferSizeMax);
var packetFormatterAdapter = new MqttPacketFormatterAdapter(options.ProtocolVersion, bufferWriter);
return new MqttChannelAdapter(channel, packetFormatterAdapter, packetInspector, logger);
return new MqttChannelAdapter(channel, packetFormatterAdapter, logger)
{
AllowPacketFragmentation = options.AllowPacketFragmentation,
PacketInspector = packetInspector
};
}
}
}
......@@ -95,7 +95,7 @@ namespace MQTTnet.Implementations
var packetFormatterAdapter = new MqttPacketFormatterAdapter(bufferWriter);
var tcpChannel = new MqttTcpChannel(args.Socket, clientCertificate, _options);
using (var clientAdapter = new MqttChannelAdapter(tcpChannel, packetFormatterAdapter, null, _rootLogger))
using (var clientAdapter = new MqttChannelAdapter(tcpChannel, packetFormatterAdapter, _rootLogger))
{
await clientHandler(clientAdapter).ConfigureAwait(false);
}
......
......@@ -229,7 +229,7 @@ namespace MQTTnet.Implementations
var bufferWriter = new MqttBufferWriter(_serverOptions.WriterBufferSize, _serverOptions.WriterBufferSizeMax);
var packetFormatterAdapter = new MqttPacketFormatterAdapter(bufferWriter);
using (var clientAdapter = new MqttChannelAdapter(tcpChannel, packetFormatterAdapter, null, _rootLogger))
using (var clientAdapter = new MqttChannelAdapter(tcpChannel, packetFormatterAdapter, _rootLogger))
{
await clientHandler(clientAdapter).ConfigureAwait(false);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册