未验证 提交 37eff15f 编写于 作者: M Marie Píchová 提交者: GitHub

[QUIC] Root listener/connection while waiting on new connection/stream event (#74450)

* Fixed GC collecting listener and/or connection while waiting on new connection/stream event

* Minor fixes and cleanups
上级 38ba41ef
......@@ -446,7 +446,6 @@
<Reference Include="System.Diagnostics.DiagnosticSource" />
<Reference Include="System.Diagnostics.Tracing" />
<Reference Include="System.IO.Compression" />
<Reference Include="System.Linq" />
<Reference Include="System.Memory" />
<Reference Include="System.Net.NameResolution" />
<Reference Include="System.Net.NetworkInformation" />
......
......@@ -7,7 +7,6 @@
using System.Runtime.Versioning;
using System.Net.Quic;
using System.IO;
using System.Linq;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
......@@ -155,7 +154,7 @@ private void CheckForShutdown()
if (_clientControl != null)
{
_clientControl.Dispose();
await _clientControl.DisposeAsync().ConfigureAwait(false);
_clientControl = null;
}
......@@ -245,7 +244,10 @@ public async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, lon
}
finally
{
requestStream?.Dispose();
if (requestStream is not null)
{
await requestStream.DisposeAsync().ConfigureAwait(false);
}
}
}
......@@ -562,7 +564,6 @@ await using (stream.ConfigureAwait(false))
}
stream.Abort(QuicAbortDirection.Read, (long)Http3ErrorCode.StreamCreationError);
stream.Dispose();
return;
}
}
......
......@@ -392,6 +392,7 @@ public async ValueTask<QuicStream> AcceptInboundStreamAsync(CancellationToken ca
throw new InvalidOperationException(SR.net_quic_accept_not_allowed);
}
GCHandle keepObject = GCHandle.Alloc(this);
try
{
return await _acceptQueue.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
......@@ -401,6 +402,10 @@ public async ValueTask<QuicStream> AcceptInboundStreamAsync(CancellationToken ca
ExceptionDispatchInfo.Capture(ex.InnerException).Throw();
throw;
}
finally
{
keepObject.Free();
}
}
/// <summary>
......
......@@ -162,6 +162,7 @@ public async ValueTask<QuicConnection> AcceptConnectionAsync(CancellationToken c
{
ObjectDisposedException.ThrowIf(_disposed == 1, this);
GCHandle keepObject = GCHandle.Alloc(this);
try
{
PendingConnection pendingConnection = await _acceptQueue.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
......@@ -175,6 +176,10 @@ await using (pendingConnection.ConfigureAwait(false))
ExceptionDispatchInfo.Capture(ex.InnerException).Throw();
throw;
}
finally
{
keepObject.Free();
}
}
private unsafe int HandleEventNewConnection(ref NEW_CONNECTION_DATA data)
......
......@@ -398,7 +398,6 @@ public void Abort(QuicAbortDirection abortDirection, long errorCode)
QUIC_STREAM_SHUTDOWN_FLAGS flags = QUIC_STREAM_SHUTDOWN_FLAGS.NONE;
if (abortDirection.HasFlag(QuicAbortDirection.Read))
{
flags |= QUIC_STREAM_SHUTDOWN_FLAGS.ABORT_RECEIVE;
if (_receiveTcs.TrySetException(ThrowHelper.GetOperationAbortedException(SR.net_quic_reading_aborted), final: true))
{
flags |= QUIC_STREAM_SHUTDOWN_FLAGS.ABORT_RECEIVE;
......@@ -537,12 +536,12 @@ private unsafe int HandleEventShutdownComplete(ref SHUTDOWN_COMPLETE data)
(shutdownByApp: true, closedRemotely: true) => ThrowHelper.GetConnectionAbortedException((long)data.ConnectionErrorCode),
// It's local shutdown by app, this side called QuicConnection.CloseAsync, throw QuicError.OperationAborted.
(shutdownByApp: true, closedRemotely: false) => ThrowHelper.GetOperationAbortedException(),
// It's remote shutdown by transport, we received a CONNECTION_CLOSE frame with a QUIC transport error code
// It's remote shutdown by transport, we received a CONNECTION_CLOSE frame with a QUIC transport error code, throw error based on the status.
// TODO: we should propagate the transport error code
// https://github.com/dotnet/runtime/issues/72666
(shutdownByApp: false, closedRemotely: true) => ThrowHelper.GetExceptionForMsQuicStatus(data.ConnectionCloseStatus, $"Shutdown by transport {data.ConnectionErrorCode}"),
// It's local shutdown by transport, due to some timeout
// TODO: we should propagate transport error code
// It's local shutdown by transport, most likely due to a timeout, throw error based on the status.
// TODO: we should propagate the transport error code
// https://github.com/dotnet/runtime/issues/72666
(shutdownByApp: false, closedRemotely: false) => ThrowHelper.GetExceptionForMsQuicStatus(data.ConnectionCloseStatus),
};
......
......@@ -336,5 +336,33 @@ public async Task Connect_PeerCertificateDisposed(bool useGetter)
}
peerCertificate.Dispose();
}
[Fact]
public async Task Connection_AwaitsStream_ConnectionSurvivesGC()
{
const byte data = 0xDC;
TaskCompletionSource<IPEndPoint> listenerEndpointTcs = new TaskCompletionSource<IPEndPoint>();
await Task.WhenAll(
Task.Run(async () =>
{
await using var listener = await CreateQuicListener();
listenerEndpointTcs.SetResult(listener.LocalEndPoint);
await using var connection = await listener.AcceptConnectionAsync();
await using var stream = await connection.AcceptInboundStreamAsync();
var buffer = new byte[1];
Assert.Equal(1, await stream.ReadAsync(buffer));
Assert.Equal(data, buffer[0]);
}).WaitAsync(TimeSpan.FromSeconds(5)),
Task.Run(async () =>
{
var endpoint = await listenerEndpointTcs.Task;
await using var connection = await CreateQuicConnection(endpoint);
await Task.Delay(TimeSpan.FromSeconds(0.5));
GC.Collect();
await using var stream = await connection.OpenOutboundStreamAsync(QuicStreamType.Unidirectional);
await stream.WriteAsync(new byte[1] { data }, completeWrites: true);
}).WaitAsync(TimeSpan.FromSeconds(5)));
}
}
}
......@@ -106,7 +106,7 @@ public async Task TwoListenersOnSamePort_DisjointAlpn_Success()
QuicListenerOptions listenerOptions = CreateQuicListenerOptions();
listenerOptions.ListenEndPoint = listener1.LocalEndPoint;
listenerOptions.ApplicationProtocols[0] = new SslApplicationProtocol("someprotocol");
listenerOptions.ConnectionOptionsCallback = (_, _, _) =>
listenerOptions.ConnectionOptionsCallback = (_, _, _) =>
{
var options = CreateQuicServerOptions();
options.ServerAuthenticationOptions.ApplicationProtocols[0] = listenerOptions.ApplicationProtocols[0];
......@@ -144,5 +144,27 @@ public async Task TwoListenersOnSamePort_SameAlpn_Throws()
//
await AssertThrowsQuicExceptionAsync(QuicError.InternalError, async () => await CreateQuicListener(listener.LocalEndPoint));
}
[Fact]
public async Task Listener_AwaitsConnection_ListenerSurvivesGC()
{
TaskCompletionSource<IPEndPoint> listenerEndpointTcs = new TaskCompletionSource<IPEndPoint>();
await Task.WhenAll(
Task.Run(async () =>
{
await using var listener = await CreateQuicListener();
listenerEndpointTcs.SetResult(listener.LocalEndPoint);
var connection = await listener.AcceptConnectionAsync();
await connection.DisposeAsync();
}).WaitAsync(TimeSpan.FromSeconds(5)),
Task.Run(async () =>
{
var endpoint = await listenerEndpointTcs.Task;
await Task.Delay(TimeSpan.FromSeconds(0.5));
GC.Collect();
var connection = await CreateQuicConnection(endpoint);
await connection.DisposeAsync();
}).WaitAsync(TimeSpan.FromSeconds(5)));
}
}
}
......@@ -102,7 +102,7 @@ protected override async Task<StreamPair> CreateConnectedStreamsAsync()
}
catch (Exception ex)
{
_output?.WriteLine($"Failed to {ex.Message}");
_output?.WriteLine($"Failed to connect: {ex.Message}");
throw;
}
}));
......@@ -153,14 +153,5 @@ public override void Dispose()
}
}
}
[OuterLoop("May take several seconds")]
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[SkipOnPlatform(TestPlatforms.LinuxBionic, "SElinux blocks UNIX sockets in our CI environment")]
[ActiveIssue("https://github.com/dotnet/runtime/issues/73377")]
public override Task Parallel_ReadWriteMultipleStreamsConcurrently()
{
return Task.CompletedTask;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册