未验证 提交 3ab095a5 编写于 作者: C Christian 提交者: GitHub

Set web socket boundaries correctly (#1514)

上级 f5d9b651
* [Client] Keep alive mechanism now uses the configured timeout value from the options (thanks to @Stannieman, #1495).
\ No newline at end of file
* [Client] Keep alive mechanism now uses the configured timeout value from the options (thanks to @Stannieman, #1495).
* [Core] MQTT Packets being sent over web socket transport are now setting the web socket frame boundaries correctly (#1499).
\ No newline at end of file
......@@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System;
using BenchmarkDotNet.Attributes;
using MQTTnet.Channel;
using MQTTnet.Implementations;
......@@ -70,8 +71,8 @@ namespace MQTTnet.Benchmarks
while (read < expected)
{
var readresult = await _clientChannel.ReadAsync(new byte[size], 0, size, CancellationToken.None).ConfigureAwait(false);
read += readresult;
var readResult = await _clientChannel.ReadAsync(new byte[size], 0, size, CancellationToken.None).ConfigureAwait(false);
read += readResult;
}
}
......@@ -79,9 +80,11 @@ namespace MQTTnet.Benchmarks
{
await Task.Yield();
var buffer = new ArraySegment<byte>(new byte[size]);
for (var i = 0; i < iterations; i++)
{
await _serverChannel.WriteAsync(new byte[size], 0, size, CancellationToken.None).ConfigureAwait(false);
await _serverChannel.WriteAsync(buffer, true, CancellationToken.None).ConfigureAwait(false);
}
}
}
......
......@@ -104,7 +104,7 @@ namespace MQTTnet.Benchmarks
return Task.FromResult(count);
}
public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
public Task WriteAsync(ArraySegment<byte> buffer, bool isEndOfPacket, CancellationToken cancellationToken)
{
throw new NotSupportedException();
}
......
......@@ -154,9 +154,9 @@ namespace MQTTnet.Extensions.WebSocket4Net
return Task.FromResult(readBytes);
}
public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
public Task WriteAsync(ArraySegment<byte> buffer, bool isEndOfPacket, CancellationToken cancellationToken)
{
_webSocket.Send(buffer, offset, count);
_webSocket.Send(buffer.Array, buffer.Offset, buffer.Count);
return Task.FromResult(0);
}
......
......@@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System;
using System.IO;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
......@@ -45,9 +46,9 @@ namespace MQTTnet.Tests.Mockups
return _stream.ReadAsync(buffer, offset, count, cancellationToken);
}
public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
public Task WriteAsync(ArraySegment<byte> buffer, bool isEndOfPacket, CancellationToken cancellationToken)
{
return _stream.WriteAsync(buffer, offset, count, cancellationToken);
return _stream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken);
}
public void Dispose()
......
......@@ -188,11 +188,14 @@ namespace MQTTnet.Adapter
_logger.Verbose("TX ({0} bytes) >>> {1}", packetBuffer.Length, packet);
await _channel.WriteAsync(packetBuffer.Packet.Array, packetBuffer.Packet.Offset, packetBuffer.Packet.Count, cancellationToken).ConfigureAwait(false);
if (packetBuffer.Payload.Count > 0)
{
await _channel.WriteAsync(packetBuffer.Payload.Array, packetBuffer.Payload.Offset, packetBuffer.Payload.Count, cancellationToken).ConfigureAwait(false);
await _channel.WriteAsync(packetBuffer.Packet, false, cancellationToken).ConfigureAwait(false);
await _channel.WriteAsync(packetBuffer.Payload, true, cancellationToken).ConfigureAwait(false);
}
else
{
await _channel.WriteAsync(packetBuffer.Packet, true, cancellationToken).ConfigureAwait(false);
}
Interlocked.Add(ref _bytesReceived, packetBuffer.Length);
......
......@@ -12,13 +12,17 @@ namespace MQTTnet.Channel
public interface IMqttChannel : IDisposable
{
string Endpoint { get; }
bool IsSecureConnection { get; }
X509Certificate2 ClientCertificate { get; }
Task ConnectAsync(CancellationToken cancellationToken);
Task DisconnectAsync(CancellationToken cancellationToken);
Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken);
Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken);
Task WriteAsync(ArraySegment<byte> buffer, bool isEndOfPacket, CancellationToken cancellationToken);
}
}
......@@ -108,12 +108,12 @@ namespace MQTTnet.Implementations
return _readStream.ReadAsync(buffer, offset, count, cancellationToken);
}
public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
public Task WriteAsync(ArraySegment<byte> buffer, bool isEndOfPacket, CancellationToken cancellationToken)
{
// In the write method only the internal buffer will be filled. So here is no
// async/await required. The real network transmit is done when calling the
// Flush method.
_writeStream.Write(buffer, offset, count);
_writeStream.Write(buffer.Array, buffer.Offset, buffer.Count);
return _writeStream.FlushAsync(cancellationToken);
}
......
......@@ -225,7 +225,7 @@ namespace MQTTnet.Implementations
}
}
public async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
public async Task WriteAsync(ArraySegment<byte> buffer, bool isEndOfPacket, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
......@@ -239,12 +239,12 @@ namespace MQTTnet.Implementations
}
#if NETCOREAPP3_0_OR_GREATER || NETSTANDARD2_1_OR_GREATER
await stream.WriteAsync(buffer.AsMemory(offset, count), cancellationToken).ConfigureAwait(false);
await stream.WriteAsync(buffer.AsMemory(), cancellationToken).ConfigureAwait(false);
#else
// Workaround for: https://github.com/dotnet/corefx/issues/24430
using (cancellationToken.Register(_disposeAction))
{
await stream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
await stream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken).ConfigureAwait(false);
}
#endif
}
......
......@@ -2,15 +2,15 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using MQTTnet.Channel;
using MQTTnet.Internal;
using System;
using System.Net;
using System.Net.WebSockets;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Channel;
using MQTTnet.Client;
using MQTTnet.Internal;
namespace MQTTnet.Implementations
{
......@@ -35,12 +35,12 @@ namespace MQTTnet.Implementations
ClientCertificate = clientCertificate;
}
public X509Certificate2 ClientCertificate { get; }
public string Endpoint { get; }
public bool IsSecureConnection { get; private set; }
public X509Certificate2 ClientCertificate { get; }
public async Task ConnectAsync(CancellationToken cancellationToken)
{
var uri = _options.Uri;
......@@ -89,14 +89,26 @@ namespace MQTTnet.Implementations
Cleanup();
}
public void Dispose()
{
Cleanup();
}
public async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer, offset, count), cancellationToken).ConfigureAwait(false);
return response.Count;
}
public async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
public async Task WriteAsync(ArraySegment<byte> buffer, bool isEndOfPacket, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
#if NET5_0_OR_GREATER
// MQTT Control Packets MUST be sent in WebSocket binary data frames. If any other type of data frame is received the recipient MUST close the Network Connection [MQTT-6.0.0-1].
// A single WebSocket data frame can contain multiple or partial MQTT Control Packets. The receiver MUST NOT assume that MQTT Control Packets are aligned on WebSocket frame boundaries [MQTT-6.0.0-2].
await _webSocket.SendAsync(buffer, WebSocketMessageType.Binary, isEndOfPacket, cancellationToken).ConfigureAwait(false);
#else
// The lock is required because the client will throw an exception if _SendAsync_ is
// called from multiple threads at the same time. But this issue only happens with several
// framework versions.
......@@ -107,13 +119,51 @@ namespace MQTTnet.Implementations
using (await _sendLock.WaitAsync(cancellationToken).ConfigureAwait(false))
{
await _webSocket.SendAsync(new ArraySegment<byte>(buffer, offset, count), WebSocketMessageType.Binary, true, cancellationToken).ConfigureAwait(false);
await _webSocket.SendAsync(buffer, WebSocketMessageType.Binary, isEndOfPacket, cancellationToken).ConfigureAwait(false);
}
#endif
}
public void Dispose()
void Cleanup()
{
Cleanup();
_sendLock?.Dispose();
_sendLock = null;
try
{
_webSocket?.Dispose();
}
catch (ObjectDisposedException)
{
}
finally
{
_webSocket = null;
}
}
IWebProxy CreateProxy()
{
if (string.IsNullOrEmpty(_options.ProxyOptions?.Address))
{
return null;
}
#if WINDOWS_UWP
throw new NotSupportedException("Proxies are not supported when using 'uap10.0'.");
#elif NETSTANDARD1_3
throw new NotSupportedException("Proxies are not supported when using 'netstandard 1.3'.");
#else
var proxyUri = new Uri(_options.ProxyOptions.Address);
if (!string.IsNullOrEmpty(_options.ProxyOptions.Username) && !string.IsNullOrEmpty(_options.ProxyOptions.Password))
{
var credentials = new NetworkCredential(_options.ProxyOptions.Username, _options.ProxyOptions.Password, _options.ProxyOptions.Domain);
return new WebProxy(proxyUri, _options.ProxyOptions.BypassOnLocal, _options.ProxyOptions.BypassList, credentials);
}
return new WebProxy(proxyUri, _options.ProxyOptions.BypassOnLocal, _options.ProxyOptions.BypassList);
#endif
}
void SetupClientWebSocket(ClientWebSocket clientWebSocket)
......@@ -154,7 +204,6 @@ namespace MQTTnet.Implementations
#else
clientWebSocket.Options.ClientCertificates.Add(certificate);
#endif
}
}
......@@ -188,47 +237,5 @@ namespace MQTTnet.Implementations
#endif
}
}
void Cleanup()
{
_sendLock?.Dispose();
_sendLock = null;
try
{
_webSocket?.Dispose();
}
catch (ObjectDisposedException)
{
}
finally
{
_webSocket = null;
}
}
IWebProxy CreateProxy()
{
if (string.IsNullOrEmpty(_options.ProxyOptions?.Address))
{
return null;
}
#if WINDOWS_UWP
throw new NotSupportedException("Proxies are not supported when using 'uap10.0'.");
#elif NETSTANDARD1_3
throw new NotSupportedException("Proxies are not supported when using 'netstandard 1.3'.");
#else
var proxyUri = new Uri(_options.ProxyOptions.Address);
if (!string.IsNullOrEmpty(_options.ProxyOptions.Username) && !string.IsNullOrEmpty(_options.ProxyOptions.Password))
{
var credentials = new NetworkCredential(_options.ProxyOptions.Username, _options.ProxyOptions.Password, _options.ProxyOptions.Domain);
return new WebProxy(proxyUri, _options.ProxyOptions.BypassOnLocal, _options.ProxyOptions.BypassList, credentials);
}
return new WebProxy(proxyUri, _options.ProxyOptions.BypassOnLocal, _options.ProxyOptions.BypassList);
#endif
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册