未验证 提交 21f07e17 编写于 作者: I Ivan Zlatanov 提交者: GitHub

Fixing concurrency issue in ManagedWebSocket where calling Dispose could...

Fixing concurrency issue in ManagedWebSocket where calling Dispose could corrupt state if there is pending send or receive operation and compression is enabled. (#87966)
上级 be86455f
......@@ -26,11 +26,7 @@ internal WebSocketDeflater(int windowBits, bool persisted)
public void Dispose()
{
if (_stream is not null)
{
_stream.Dispose();
_stream = null;
}
_stream?.Dispose();
}
public void ReleaseBuffer()
......
......@@ -58,11 +58,7 @@ internal WebSocketInflater(int windowBits, bool persisted)
public void Dispose()
{
if (_stream is not null)
{
_stream.Dispose();
_stream = null;
}
_stream?.Dispose();
ReleaseBuffer();
}
......
......@@ -213,13 +213,36 @@ private void DisposeCore()
_disposed = true;
_keepAliveTimer?.Dispose();
_stream.Dispose();
_inflater?.Dispose();
_deflater?.Dispose();
if (_state < WebSocketState.Aborted)
{
_state = WebSocketState.Closed;
}
DisposeSafe(_inflater, _receiveMutex);
DisposeSafe(_deflater, _sendMutex);
}
}
private static void DisposeSafe(IDisposable? resource, AsyncMutex mutex)
{
if (resource is not null)
{
Task lockTask = mutex.EnterAsync(CancellationToken.None);
if (lockTask.IsCompleted)
{
resource.Dispose();
mutex.Exit();
}
else
{
lockTask.GetAwaiter().UnsafeOnCompleted(() =>
{
resource.Dispose();
mutex.Exit();
});
}
}
}
......@@ -511,6 +534,8 @@ private async ValueTask SendFrameFallbackAsync(MessageOpcode opcode, bool endOfM
/// <summary>Writes a frame into the send buffer, which can then be sent over the network.</summary>
private int WriteFrameToSendBuffer(MessageOpcode opcode, bool endOfMessage, bool disableCompression, ReadOnlySpan<byte> payloadBuffer)
{
ObjectDisposedException.ThrowIf(_disposed, typeof(WebSocket));
if (_deflater is not null && !disableCompression)
{
payloadBuffer = _deflater.Deflate(payloadBuffer, endOfMessage);
......@@ -680,6 +705,8 @@ private async ValueTask<TResult> ReceiveAsyncPrivate<TResult>(Memory<byte> paylo
try
{
await _receiveMutex.EnterAsync(cancellationToken).ConfigureAwait(false);
ObjectDisposedException.ThrowIf(_disposed, typeof(WebSocket));
try
{
while (true) // in case we get control frames that should be ignored from the user's perspective
......
......@@ -646,6 +646,41 @@ public async Task CompressedMessageWithEmptyLastFrame()
Assert.Equal(frame1.Length + frame2.Length, messageSize);
}
[Fact]
public async Task DisposeShouldNotCorruptStateWhileReceiving()
{
WebSocketTestStream stream = new();
using WebSocket server = WebSocket.CreateFromStream(stream, new WebSocketCreationOptions
{
IsServer = true,
KeepAliveInterval = TimeSpan.Zero,
DangerousDeflateOptions = new WebSocketDeflateOptions()
});
using WebSocket client = WebSocket.CreateFromStream(stream.Remote, new WebSocketCreationOptions
{
IsServer = false,
KeepAliveInterval = TimeSpan.Zero,
DangerousDeflateOptions = new WebSocketDeflateOptions()
});
byte[] buffer = new byte[64];
// Send two messages so that the zlib stream has data in its internal dictionary
await SendTextAsync("Hello World", client);
await server.ReceiveAsync(buffer, CancellationToken.None);
buffer.AsSpan().Clear();
stream.DelayForNextRead = TimeSpan.FromSeconds(1);
stream.IgnoreCancellationToken = true;
await SendTextAsync("Hello Worlds", client);
Task<WebSocketReceiveResult> receiveTask = server.ReceiveAsync(buffer, CancellationToken.None);
server.Dispose();
var result = await receiveTask;
Assert.Equal("Hello Worlds", Encoding.UTF8.GetString(buffer.AsSpan(0, result.Count)));
}
private ValueTask SendTextAsync(string text, WebSocket websocket, bool disableCompression = false)
{
WebSocketMessageFlags flags = WebSocketMessageFlags.EndOfMessage;
......
......@@ -74,6 +74,18 @@ public Span<byte> NextAvailableBytes
/// </summary>
public TimeSpan DelayForNextSend { get; set; }
/// <summary>
/// If set, would cause the next read operation to be delayed
/// and complete asynchronously. Can be used to test cancellation tokens
/// and async code branches.
/// </summary>
public TimeSpan DelayForNextRead { get; set; }
/// <summary>
/// When set, ignores the cancellation token passed to ReadAsync and WriteAsync.
/// </summary>
public bool IgnoreCancellationToken { get; set; }
public override bool CanRead => true;
public override bool CanSeek => false;
......@@ -100,10 +112,16 @@ protected override void Dispose(bool disposing)
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
{
if (DelayForNextRead > TimeSpan.Zero)
{
await Task.Delay(DelayForNextRead);
DelayForNextRead = TimeSpan.Zero;
}
using CancellationTokenSource cancellation = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposed.Token);
try
{
await _inputLock.WaitAsync(cancellation.Token).ConfigureAwait(false);
await _inputLock.WaitAsync(IgnoreCancellationToken ? default : cancellation.Token).ConfigureAwait(false);
}
catch (TaskCanceledException) when (cancellationToken.IsCancellationRequested)
{
......@@ -199,7 +217,7 @@ public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, Cancella
{
if (DelayForNextSend > TimeSpan.Zero)
{
await Task.Delay(DelayForNextSend, cancellationToken);
await Task.Delay(DelayForNextSend, IgnoreCancellationToken ? default : cancellationToken);
DelayForNextSend = TimeSpan.Zero;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册