未验证 提交 2a054e79 编写于 作者: C Cory Nelson 提交者: GitHub

Implement QUIC abort exceptions, fix channel usage, and implement some tests. (#32051)

New: implement QuicStreamAbortedException, QuicConnectionAbortedException, QuicOperationAbortedException.
Fix: MsQuic returning null implementation when completing listener/connection channels.
Fix: some flags types being defined as 8-bit when they are 32-bit in msquic.h
Add some additional tests demonstrating failures seen in HttpClient.
上级 21719502
......@@ -95,4 +95,8 @@ public class QuicStreamAbortedException : QuicException
public QuicStreamAbortedException(string message, long errorCode) : base(message) { }
public long ErrorCode { get; }
}
public class QuicOperationAbortedException : QuicException
{
public QuicOperationAbortedException(string message) : base(message) { }
}
}
......@@ -55,7 +55,7 @@ internal enum QUIC_STREAM_SHUTDOWN_FLAG : uint
}
[Flags]
internal enum QUIC_RECEIVE_FLAG : byte
internal enum QUIC_RECEIVE_FLAG : uint
{
NONE = 0,
ZERO_RTT = 0x1,
......@@ -133,12 +133,12 @@ internal enum QUIC_PARAM_STREAM : uint
IDEAL_SEND_BUFFER = 2
}
internal enum QUIC_LISTENER_EVENT : byte
internal enum QUIC_LISTENER_EVENT : uint
{
NEW_CONNECTION = 0
}
internal enum QUIC_CONNECTION_EVENT : byte
internal enum QUIC_CONNECTION_EVENT : uint
{
CONNECTED = 0,
SHUTDOWN_INITIATED_BY_TRANSPORT = 1,
......@@ -152,7 +152,7 @@ internal enum QUIC_CONNECTION_EVENT : byte
IDEAL_PROCESSOR_CHANGED = 9,
}
internal enum QUIC_STREAM_EVENT : byte
internal enum QUIC_STREAM_EVENT : uint
{
START_COMPLETE = 0,
RECEIVE = 1,
......
......@@ -183,7 +183,7 @@ internal struct ConnectionEventDataShutdownBegin
[StructLayout(LayoutKind.Sequential)]
internal struct ConnectionEventDataShutdownBeginPeer
{
internal ushort ErrorCode;
internal long ErrorCode;
}
[StructLayout(LayoutKind.Sequential)]
......@@ -264,7 +264,7 @@ internal struct ConnectionEvent
internal bool EarlyDataAccepted => Data.Connected.EarlyDataAccepted;
internal ulong NumBytes => Data.IdealSendBuffer.NumBytes;
internal uint ShutdownBeginStatus => Data.ShutdownBegin.Status;
internal ushort ShutdownBeginPeerStatus => Data.ShutdownBeginPeer.ErrorCode;
internal long ShutdownBeginPeerStatus => Data.ShutdownBeginPeer.ErrorCode;
internal bool ShutdownTimedOut => Data.ShutdownComplete.TimedOut;
internal ushort BiDirectionalCount => Data.StreamsAvailable.BiDirectionalCount;
internal ushort UniDirectionalCount => Data.StreamsAvailable.UniDirectionalCount;
......@@ -324,13 +324,13 @@ internal bool IsCanceled()
[StructLayout(LayoutKind.Sequential)]
internal struct StreamEventDataPeerSendAbort
{
internal ushort ErrorCode;
internal long ErrorCode;
}
[StructLayout(LayoutKind.Sequential)]
internal struct StreamEventDataPeerRecvAbort
{
internal ushort ErrorCode;
internal long ErrorCode;
}
[StructLayout(LayoutKind.Sequential)]
......
......@@ -117,10 +117,16 @@
<resheader name="writer">
<value>System.Resources.ResXResourceWriter, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089</value>
</resheader>
<data name="net_quic_connectionaborted" xml:space="preserve">
<value>Connection aborted by peer ({0}).</value>
</data>
<data name="net_quic_notsupported" xml:space="preserve">
<value>QUIC is not supported on this platform. See http://aka.ms/dotnetquic</value>
</data>
<data name="net_quic_placeholdertext" xml:space="preserve">
<value>Placeholder text</value>
<data name="net_quic_operationaborted" xml:space="preserve">
<value>Operation aborted.</value>
</data>
<data name="net_quic_streamaborted" xml:space="preserve">
<value>Stream aborted by peer ({0}).</value>
</data>
</root>
\ No newline at end of file
......@@ -32,6 +32,7 @@
<Compile Include="System\Net\Quic\QuicConnection.cs" />
<Compile Include="System\Net\Quic\QuicListener.cs" />
<Compile Include="System\Net\Quic\QuicListenerOptions.cs" />
<Compile Include="System\Net\Quic\QuicOperationAbortedException.cs" />
<Compile Include="System\Net\Quic\QuicStream.cs" />
<Compile Include="System\Net\Quic\QuicException.cs" />
<Compile Include="System\Net\Quic\QuicConnectionAbortedException.cs" />
......
......@@ -39,6 +39,7 @@ internal sealed class MsQuicConnection : QuicConnectionProvider
private bool _disposed;
private bool _connected;
private MsQuicSecurityConfig _securityConfig;
private long _abortErrorCode = -1;
// Queue for accepted streams
private readonly Channel<MsQuicStream> _acceptQueue = Channel.CreateUnbounded<MsQuicStream>(new UnboundedChannelOptions()
......@@ -200,6 +201,7 @@ private uint HandleEventShutdownInitiatedByTransport(ConnectionEvent connectionE
private uint HandleEventShutdownInitiatedByPeer(ConnectionEvent connectionEvent)
{
_abortErrorCode = connectionEvent.Data.ShutdownBeginPeer.ErrorCode;
_acceptQueue.Writer.Complete();
return MsQuicStatusCodes.Success;
}
......@@ -237,18 +239,23 @@ internal override async ValueTask<QuicStreamProvider> AcceptStreamAsync(Cancella
ThrowIfDisposed();
if (await _acceptQueue.Reader.WaitToReadAsync(cancellationToken))
MsQuicStream stream;
try
{
stream = await _acceptQueue.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
}
catch (ChannelClosedException)
{
if (_acceptQueue.Reader.TryRead(out MsQuicStream stream))
throw _abortErrorCode switch
{
if (NetEventSource.IsEnabled) NetEventSource.Exit(this);
return stream;
}
-1 => new QuicOperationAbortedException(), // Shutdown initiated by us.
long err => new QuicConnectionAbortedException(err) // Shutdown initiated by peer.
};
}
if (NetEventSource.IsEnabled) NetEventSource.Exit(this);
return null;
return stream;
}
internal override QuicStreamProvider OpenUnidirectionalStream()
......
......@@ -12,7 +12,7 @@
namespace System.Net.Quic.Implementations.MsQuic
{
internal class MsQuicListener : QuicListenerProvider, IDisposable
internal sealed class MsQuicListener : QuicListenerProvider, IDisposable
{
// Security configuration for MsQuic
private MsQuicSession _session;
......@@ -65,21 +65,21 @@ internal override async ValueTask<QuicConnectionProvider> AcceptConnectionAsync(
ThrowIfDisposed();
if (await _acceptConnectionQueue.Reader.WaitToReadAsync())
{
if (_acceptConnectionQueue.Reader.TryRead(out MsQuicConnection connection))
{
// resolve security config here.
await connection.SetSecurityConfigForConnection(_sslOptions.ServerCertificate);
if (NetEventSource.IsEnabled) NetEventSource.Exit(this);
MsQuicConnection connection;
return connection;
}
try
{
connection = await _acceptConnectionQueue.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
}
catch (ChannelClosedException)
{
throw new QuicOperationAbortedException();
}
if (NetEventSource.IsEnabled) NetEventSource.Exit(this);
await connection.SetSecurityConfigForConnection(_sslOptions.ServerCertificate);
return null;
if (NetEventSource.IsEnabled) NetEventSource.Exit(this);
return connection;
}
public override void Dispose()
......@@ -174,7 +174,7 @@ private unsafe void SetListenPort()
}
}
protected void StopAcceptingConnections()
private void StopAcceptingConnections()
{
_acceptConnectionQueue.Writer.TryComplete();
}
......
......@@ -48,10 +48,12 @@ internal sealed class MsQuicStream : QuicStreamProvider
private StartState _started;
private ReadState _readState;
private long _readErrorCode = -1;
private ShutdownWriteState _shutdownState;
private SendState _sendState;
private long _sendErrorCode = -1;
// Used by the class to indicate that the stream is m_Readable.
private readonly bool _canRead;
......@@ -245,7 +247,11 @@ internal override async ValueTask<int> ReadAsync(Memory<byte> destination, Cance
}
else if (_readState == ReadState.Aborted)
{
throw new IOException("Reading has been aborted by the peer.");
throw _readErrorCode switch
{
-1 => new QuicOperationAbortedException(),
long err => new QuicStreamAbortedException(err)
};
}
}
......@@ -402,6 +408,7 @@ internal override void Write(ReadOnlySpan<byte> buffer)
{
ThrowIfDisposed();
// TODO: optimize this.
WriteAsync(buffer.ToArray()).GetAwaiter().GetResult();
}
......@@ -532,14 +539,13 @@ private uint HandleEvent(ref StreamEvent evt)
// Peer has told us to abort the reading side of the stream.
case QUIC_STREAM_EVENT.PEER_SEND_ABORTED:
{
status = HandleEventPeerSendAborted();
status = HandleEventPeerSendAborted(ref evt);
}
break;
// Peer has stopped receiving data, don't send anymore.
// Potentially throw when WriteAsync/FlushAsync.
case QUIC_STREAM_EVENT.PEER_RECEIVE_ABORTED:
{
status = HandleEventPeerRecvAbort();
status = HandleEventPeerRecvAborted(ref evt);
}
break;
// Occurs when shutdown is completed for the send side.
......@@ -598,9 +604,26 @@ private unsafe uint HandleEventRecv(ref MsQuicNativeMethods.StreamEvent evt)
return MsQuicStatusCodes.Pending;
}
private uint HandleEventPeerRecvAbort()
private uint HandleEventPeerRecvAborted(ref StreamEvent evt)
{
if (NetEventSource.IsEnabled) NetEventSource.Enter(this);
bool shouldComplete = false;
lock (_sync)
{
if (_sendState == SendState.None)
{
shouldComplete = true;
}
_sendState = SendState.Aborted;
_sendErrorCode = evt.Data.PeerSendAbort.ErrorCode;
}
if (shouldComplete)
{
_sendResettableCompletionSource.CompleteException(new QuicStreamAbortedException(_sendErrorCode));
}
if (NetEventSource.IsEnabled) NetEventSource.Exit(this);
return MsQuicStatusCodes.Success;
......@@ -696,7 +719,7 @@ private uint HandleEventShutdownComplete()
return MsQuicStatusCodes.Success;
}
private uint HandleEventPeerSendAborted()
private uint HandleEventPeerSendAborted(ref StreamEvent evt)
{
if (NetEventSource.IsEnabled) NetEventSource.Enter(this);
......@@ -708,11 +731,12 @@ private uint HandleEventPeerSendAborted()
shouldComplete = true;
}
_readState = ReadState.Aborted;
_readErrorCode = evt.Data.PeerSendAbort.ErrorCode;
}
if (shouldComplete)
{
_receiveResettableCompletionSource.CompleteException(new IOException("Reading has been aborted by the peer."));
_receiveResettableCompletionSource.CompleteException(new QuicStreamAbortedException(_readErrorCode));
}
if (NetEventSource.IsEnabled) NetEventSource.Exit(this);
......
......@@ -6,6 +6,11 @@ namespace System.Net.Quic
{
public class QuicConnectionAbortedException : QuicException
{
internal QuicConnectionAbortedException(long errorCode)
: this(SR.Format(SR.net_quic_connectionaborted, errorCode), errorCode)
{
}
public QuicConnectionAbortedException(string message, long errorCode)
: base (message)
{
......
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
namespace System.Net.Quic
{
public class QuicOperationAbortedException : QuicException
{
internal QuicOperationAbortedException()
: base(SR.net_quic_operationaborted)
{
}
public QuicOperationAbortedException(string message) : base(message)
{
}
}
}
......@@ -6,8 +6,13 @@ namespace System.Net.Quic
{
public class QuicStreamAbortedException : QuicException
{
internal QuicStreamAbortedException(long errorCode)
: this(SR.Format(SR.net_quic_streamaborted, errorCode), errorCode)
{
}
public QuicStreamAbortedException(string message, long errorCode)
: base (message)
: base(message)
{
ErrorCode = errorCode;
}
......
using System.Collections.Generic;
using System.Net.Security;
using System.Threading.Tasks;
namespace System.Net.Quic.Tests
{
public class MsQuicTestBase : IDisposable
public class MsQuicTestBase
{
public MsQuicTestBase()
{
IPEndPoint endpoint = new IPEndPoint(IPAddress.Loopback, 8000);
DefaultListener = CreateQuicListener(endpoint);
}
public QuicListener DefaultListener { get; }
public SslServerAuthenticationOptions GetSslServerAuthenticationOptions()
{
return new SslServerAuthenticationOptions()
......@@ -34,6 +27,11 @@ public QuicConnection CreateQuicConnection(IPEndPoint endpoint)
return new QuicConnection(QuicImplementationProviders.MsQuic, endpoint, GetSslClientAuthenticationOptions());
}
public QuicListener CreateQuicListener()
{
return CreateQuicListener(new IPEndPoint(IPAddress.Loopback, 0));
}
public QuicListener CreateQuicListener(IPEndPoint endpoint)
{
QuicListener listener = new QuicListener(QuicImplementationProviders.MsQuic, endpoint, GetSslServerAuthenticationOptions());
......@@ -41,9 +39,24 @@ public QuicListener CreateQuicListener(IPEndPoint endpoint)
return listener;
}
public void Dispose()
public async Task RunClientServer(Func<QuicConnection, Task> clientFunction, Func<QuicConnection, Task> serverFunction, int millisecondsTimeout = 10_000)
{
DefaultListener.Dispose();
using QuicListener listener = CreateQuicListener();
await new[]
{
Task.Run(async () =>
{
using QuicConnection serverConnection = await listener.AcceptConnectionAsync();
await serverFunction(serverConnection);
}),
Task.Run(async () =>
{
using QuicConnection clientConnection = CreateQuicConnection(listener.ListenEndPoint);
await clientConnection.ConnectAsync();
await clientFunction(clientConnection);
})
}.WhenAllOrAnyFailed(millisecondsTimeout);
}
}
}
......@@ -21,11 +21,13 @@ public class MsQuicTests : MsQuicTestBase
[Fact]
public async Task BasicTest()
{
using QuicListener listener = CreateQuicListener();
for (int i = 0; i < 100; i++)
{
Task listenTask = Task.Run(async () =>
{
using QuicConnection connection = await DefaultListener.AcceptConnectionAsync();
using QuicConnection connection = await listener.AcceptConnectionAsync();
await using QuicStream stream = await connection.AcceptStreamAsync();
byte[] buffer = new byte[s_data.Length];
......@@ -42,7 +44,7 @@ public async Task BasicTest()
Task clientTask = Task.Run(async () =>
{
using QuicConnection connection = CreateQuicConnection(DefaultListener.ListenEndPoint);
using QuicConnection connection = CreateQuicConnection(listener.ListenEndPoint);
await connection.ConnectAsync();
await using QuicStream stream = connection.OpenBidirectionalStream();
......@@ -66,12 +68,14 @@ public async Task BasicTest()
[Fact]
public async Task MultipleReadsAndWrites()
{
using QuicListener listener = CreateQuicListener();
for (int j = 0; j < 100; j++)
{
Task listenTask = Task.Run(async () =>
{
// Connection isn't being accepted, interesting.
using QuicConnection connection = await DefaultListener.AcceptConnectionAsync();
using QuicConnection connection = await listener.AcceptConnectionAsync();
await using QuicStream stream = await connection.AcceptStreamAsync();
byte[] buffer = new byte[s_data.Length];
......@@ -97,7 +101,7 @@ public async Task MultipleReadsAndWrites()
Task clientTask = Task.Run(async () =>
{
using QuicConnection connection = CreateQuicConnection(DefaultListener.ListenEndPoint);
using QuicConnection connection = CreateQuicConnection(listener.ListenEndPoint);
await connection.ConnectAsync();
await using QuicStream stream = connection.OpenBidirectionalStream();
......@@ -131,10 +135,12 @@ public async Task MultipleReadsAndWrites()
[Fact]
public async Task MultipleStreamsOnSingleConnection()
{
using QuicListener listener = CreateQuicListener();
Task listenTask = Task.Run(async () =>
{
{
using QuicConnection connection = await DefaultListener.AcceptConnectionAsync();
using QuicConnection connection = await listener.AcceptConnectionAsync();
await using QuicStream stream = await connection.AcceptStreamAsync();
await using QuicStream stream2 = await connection.AcceptStreamAsync();
......@@ -173,7 +179,7 @@ public async Task MultipleStreamsOnSingleConnection()
Task clientTask = Task.Run(async () =>
{
using QuicConnection connection = CreateQuicConnection(DefaultListener.ListenEndPoint);
using QuicConnection connection = CreateQuicConnection(listener.ListenEndPoint);
await connection.ConnectAsync();
await using QuicStream stream = connection.OpenBidirectionalStream();
await using QuicStream stream2 = connection.OpenBidirectionalStream();
......@@ -210,23 +216,6 @@ public async Task MultipleStreamsOnSingleConnection()
await (new[] { listenTask, clientTask }).WhenAllOrAnyFailed(millisecondsTimeout: 60000);
}
[Fact]
public async Task AbortiveConnectionFromClient()
{
using QuicConnection clientConnection = CreateQuicConnection(DefaultListener.ListenEndPoint);
ValueTask clientTask = clientConnection.ConnectAsync();
using QuicConnection serverConnection = await DefaultListener.AcceptConnectionAsync();
await clientTask;
// Close connection on client, verifying server connection is aborted.
await clientConnection.CloseAsync(errorCode: 0);
QuicStream stream = await serverConnection.AcceptStreamAsync();
// Providers are alaways wrapped right now by a QuicStream. All fields are null here.
// TODO make sure this returns null.
Assert.Throws<NullReferenceException>(() => stream.CanRead);
}
[Fact]
public async Task TestStreams()
{
......@@ -268,10 +257,11 @@ public async Task TestStreams()
[Fact]
public async Task UnidirectionalAndBidirectionalStreamCountsWork()
{
using QuicConnection clientConnection = CreateQuicConnection(DefaultListener.ListenEndPoint);
using QuicListener listener = CreateQuicListener();
using QuicConnection clientConnection = CreateQuicConnection(listener.ListenEndPoint);
ValueTask clientTask = clientConnection.ConnectAsync();
using QuicConnection serverConnection = await DefaultListener.AcceptConnectionAsync();
using QuicConnection serverConnection = await listener.AcceptConnectionAsync();
await clientTask;
Assert.Equal(100, serverConnection.GetRemoteAvailableBidirectionalStreamCount());
Assert.Equal(100, serverConnection.GetRemoteAvailableUnidirectionalStreamCount());
......@@ -280,18 +270,20 @@ public async Task UnidirectionalAndBidirectionalStreamCountsWork()
[Fact]
public async Task UnidirectionalAndBidirectionalChangeValues()
{
using QuicListener listener = CreateQuicListener();
QuicClientConnectionOptions options = new QuicClientConnectionOptions()
{
MaxBidirectionalStreams = 10,
MaxUnidirectionalStreams = 20,
RemoteEndPoint = DefaultListener.ListenEndPoint,
RemoteEndPoint = listener.ListenEndPoint,
ClientAuthenticationOptions = GetSslClientAuthenticationOptions()
};
using QuicConnection clientConnection = new QuicConnection(QuicImplementationProviders.MsQuic, options);
ValueTask clientTask = clientConnection.ConnectAsync();
using QuicConnection serverConnection = await DefaultListener.AcceptConnectionAsync();
using QuicConnection serverConnection = await listener.AcceptConnectionAsync();
await clientTask;
Assert.Equal(20, clientConnection.GetRemoteAvailableUnidirectionalStreamCount());
Assert.Equal(10, clientConnection.GetRemoteAvailableBidirectionalStreamCount());
......@@ -302,10 +294,11 @@ public async Task UnidirectionalAndBidirectionalChangeValues()
[Fact]
public async Task CallDifferentWriteMethodsWorks()
{
using QuicConnection clientConnection = CreateQuicConnection(DefaultListener.ListenEndPoint);
using QuicListener listener = CreateQuicListener();
using QuicConnection clientConnection = CreateQuicConnection(listener.ListenEndPoint);
ValueTask clientTask = clientConnection.ConnectAsync();
using QuicConnection serverConnection = await DefaultListener.AcceptConnectionAsync();
using QuicConnection serverConnection = await listener.AcceptConnectionAsync();
await clientTask;
ReadOnlyMemory<byte> helloWorld = Encoding.ASCII.GetBytes("Hello world!");
......@@ -328,69 +321,6 @@ public async Task CallDifferentWriteMethodsWorks()
Assert.Equal(24, res);
}
[Theory]
[MemberData(nameof(QuicStream_ReadWrite_Random_Success_Data))]
public async Task QuicStream_ReadWrite_Random_Success(int readSize, int writeSize)
{
byte[] testBuffer = new byte[8192];
new Random().NextBytes(testBuffer);
await Task.WhenAll(DoWrite(), DoRead());
async Task DoWrite()
{
using QuicConnection clientConnection = CreateQuicConnection(DefaultListener.ListenEndPoint);
await clientConnection.ConnectAsync();
await using QuicStream clientStream = clientConnection.OpenUnidirectionalStream();
ReadOnlyMemory<byte> sendBuffer = testBuffer;
while (sendBuffer.Length != 0)
{
ReadOnlyMemory<byte> chunk = sendBuffer.Slice(0, Math.Min(sendBuffer.Length, writeSize));
await clientStream.WriteAsync(chunk);
sendBuffer = sendBuffer.Slice(chunk.Length);
}
clientStream.Shutdown();
await clientStream.ShutdownWriteCompleted();
}
async Task DoRead()
{
using QuicConnection serverConnection = await DefaultListener.AcceptConnectionAsync();
await using QuicStream serverStream = await serverConnection.AcceptStreamAsync();
byte[] receiveBuffer = new byte[testBuffer.Length];
int totalBytesRead = 0;
while (totalBytesRead != receiveBuffer.Length)
{
int bytesRead = await serverStream.ReadAsync(receiveBuffer.AsMemory(totalBytesRead, Math.Min(receiveBuffer.Length - totalBytesRead, readSize)));
if (bytesRead == 0)
{
break;
}
totalBytesRead += bytesRead;
}
Assert.True(receiveBuffer.AsSpan().SequenceEqual(testBuffer));
}
}
public static IEnumerable<object[]> QuicStream_ReadWrite_Random_Success_Data()
{
IEnumerable<int> sizes = Enumerable.Range(1, 8).Append(2048).Append(8192);
return
from readSize in sizes
from writeSize in sizes
select new object[] { readSize, writeSize };
}
private static async Task CreateAndTestBidirectionalStream(QuicConnection c1, QuicConnection c2)
{
using (QuicStream s1 = c1.OpenBidirectionalStream())
......
......@@ -11,178 +11,28 @@
namespace System.Net.Quic.Tests
{
public class QuicConnectionTests
[ConditionalClass(typeof(QuicConnection), nameof(QuicConnection.IsQuicSupported))]
public class QuicConnectionTests : MsQuicTestBase
{
private static ReadOnlyMemory<byte> s_data = Encoding.UTF8.GetBytes("Hello world!");
[Fact]
public async Task BasicTest()
{
using (QuicListener listener = new QuicListener(QuicImplementationProviders.Mock, new IPEndPoint(IPAddress.Loopback, 0), sslServerAuthenticationOptions: null))
{
listener.Start();
IPEndPoint listenEndPoint = listener.ListenEndPoint;
await Task.WhenAll(
Task.Run(async () =>
{
// Client code
using (QuicConnection connection = new QuicConnection(QuicImplementationProviders.Mock, listenEndPoint, sslClientAuthenticationOptions: null))
{
await connection.ConnectAsync();
using (QuicStream stream = connection.OpenBidirectionalStream())
{
await stream.WriteAsync(s_data);
}
}
}),
Task.Run(async () =>
{
// Server code
using (QuicConnection connection = await listener.AcceptConnectionAsync())
{
using (QuicStream stream = await connection.AcceptStreamAsync())
{
byte[] buffer = new byte[s_data.Length];
int bytesRead = await stream.ReadAsync(buffer);
Assert.Equal(s_data.Length, bytesRead);
Assert.True(s_data.Span.SequenceEqual(buffer));
}
}
}));
}
}
[Fact]
public async Task TestStreams()
public async Task AcceptStream_ConnectionAborted_ByClient_Throws()
{
using (QuicListener listener = new QuicListener(QuicImplementationProviders.Mock, new IPEndPoint(IPAddress.Loopback, 0), sslServerAuthenticationOptions: null))
{
listener.Start();
IPEndPoint listenEndPoint = listener.ListenEndPoint;
using (QuicConnection clientConnection = new QuicConnection(QuicImplementationProviders.Mock, listenEndPoint, sslClientAuthenticationOptions: null))
{
Assert.False(clientConnection.Connected);
Assert.Equal(listenEndPoint, clientConnection.RemoteEndPoint);
ValueTask connectTask = clientConnection.ConnectAsync();
QuicConnection serverConnection = await listener.AcceptConnectionAsync();
await connectTask;
Assert.True(clientConnection.Connected);
Assert.True(serverConnection.Connected);
Assert.Equal(listenEndPoint, serverConnection.LocalEndPoint);
Assert.Equal(listenEndPoint, clientConnection.RemoteEndPoint);
Assert.Equal(clientConnection.LocalEndPoint, serverConnection.RemoteEndPoint);
const int ExpectedErrorCode = 1234;
await CreateAndTestBidirectionalStream(clientConnection, serverConnection);
await CreateAndTestBidirectionalStream(serverConnection, clientConnection);
await CreateAndTestUnidirectionalStream(serverConnection, clientConnection);
await CreateAndTestUnidirectionalStream(clientConnection, serverConnection);
}
}
}
private static async Task CreateAndTestBidirectionalStream(QuicConnection c1, QuicConnection c2)
{
using (QuicStream s1 = c1.OpenBidirectionalStream())
{
Assert.True(s1.CanRead);
Assert.True(s1.CanWrite);
using var sync = new SemaphoreSlim(0);
ValueTask writeTask = s1.WriteAsync(s_data);
using (QuicStream s2 = await c2.AcceptStreamAsync())
await RunClientServer(
async clientConnection =>
{
await ReceiveDataAsync(s_data, s2);
await writeTask;
await TestBidirectionalStream(s1, s2);
}
}
}
private static async Task CreateAndTestUnidirectionalStream(QuicConnection c1, QuicConnection c2)
{
using (QuicStream s1 = c1.OpenUnidirectionalStream())
{
Assert.False(s1.CanRead);
Assert.True(s1.CanWrite);
ValueTask writeTask = s1.WriteAsync(s_data);
using (QuicStream s2 = await c2.AcceptStreamAsync())
await clientConnection.CloseAsync(ExpectedErrorCode);
sync.Release();
},
async serverConnection =>
{
await ReceiveDataAsync(s_data, s2);
await writeTask;
await TestUnidirectionalStream(s1, s2);
}
}
}
private static async Task TestBidirectionalStream(QuicStream s1, QuicStream s2)
{
Assert.True(s1.CanRead);
Assert.True(s1.CanWrite);
Assert.True(s2.CanRead);
Assert.True(s2.CanWrite);
Assert.Equal(s1.StreamId, s2.StreamId);
await SendAndReceiveDataAsync(s_data, s1, s2);
await SendAndReceiveDataAsync(s_data, s2, s1);
await SendAndReceiveDataAsync(s_data, s2, s1);
await SendAndReceiveDataAsync(s_data, s1, s2);
await SendAndReceiveEOFAsync(s1, s2);
await SendAndReceiveEOFAsync(s2, s1);
}
private static async Task TestUnidirectionalStream(QuicStream s1, QuicStream s2)
{
Assert.False(s1.CanRead);
Assert.True(s1.CanWrite);
Assert.True(s2.CanRead);
Assert.False(s2.CanWrite);
Assert.Equal(s1.StreamId, s2.StreamId);
await SendAndReceiveDataAsync(s_data, s1, s2);
await SendAndReceiveDataAsync(s_data, s1, s2);
await SendAndReceiveEOFAsync(s1, s2);
}
private static async Task SendAndReceiveDataAsync(ReadOnlyMemory<byte> data, QuicStream s1, QuicStream s2)
{
await s1.WriteAsync(data);
await ReceiveDataAsync(data, s2);
}
private static async Task ReceiveDataAsync(ReadOnlyMemory<byte> data, QuicStream s)
{
Memory<byte> readBuffer = new byte[data.Length];
int bytesRead = 0;
while (bytesRead < data.Length)
{
bytesRead += await s.ReadAsync(readBuffer.Slice(bytesRead));
}
Assert.True(data.Span.SequenceEqual(readBuffer.Span));
}
private static async Task SendAndReceiveEOFAsync(QuicStream s1, QuicStream s2)
{
byte[] readBuffer = new byte[1];
await s1.WriteAsync(Memory<byte>.Empty, endStream: true);
await s1.ShutdownWriteCompleted();
int bytesRead = await s2.ReadAsync(readBuffer);
Assert.Equal(0, bytesRead);
// Another read should still give EOF
bytesRead = await s2.ReadAsync(readBuffer);
Assert.Equal(0, bytesRead);
await sync.WaitAsync();
QuicConnectionAbortedException ex = await Assert.ThrowsAsync<QuicConnectionAbortedException>(() => serverConnection.AcceptStreamAsync().AsTask());
Assert.Equal(ExpectedErrorCode, ex.ErrorCode);
});
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Xunit;
namespace System.Net.Quic.Tests
{
[ConditionalClass(typeof(QuicConnection), nameof(QuicConnection.IsQuicSupported))]
public class QuicListenerTests : MsQuicTestBase
{
[ActiveIssue("https://github.com/dotnet/runtime/issues/32048")]
[Fact]
public async Task Listener_Backlog_Success()
{
await Task.Run(async () =>
{
using QuicListener listener = CreateQuicListener();
using QuicConnection clientConnection = CreateQuicConnection(listener.ListenEndPoint);
await clientConnection.ConnectAsync();
using QuicConnection serverConnection = await listener.AcceptConnectionAsync();
}).TimeoutAfter(millisecondsTimeout: 5_000);
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace System.Net.Quic.Tests
{
[ConditionalClass(typeof(QuicConnection), nameof(QuicConnection.IsQuicSupported))]
public class QuicStreamTests : MsQuicTestBase
{
[Theory]
[MemberData(nameof(ReadWrite_Random_Success_Data))]
public async Task ReadWrite_Random_Success(int readSize, int writeSize)
{
byte[] testBuffer = new byte[8192];
new Random().NextBytes(testBuffer);
await RunClientServer(
async clientConnection =>
{
await using QuicStream clientStream = clientConnection.OpenUnidirectionalStream();
ReadOnlyMemory<byte> sendBuffer = testBuffer;
while (sendBuffer.Length != 0)
{
ReadOnlyMemory<byte> chunk = sendBuffer.Slice(0, Math.Min(sendBuffer.Length, writeSize));
await clientStream.WriteAsync(chunk);
sendBuffer = sendBuffer.Slice(chunk.Length);
}
clientStream.Shutdown();
await clientStream.ShutdownWriteCompleted();
},
async serverConnection =>
{
await using QuicStream serverStream = await serverConnection.AcceptStreamAsync();
byte[] receiveBuffer = new byte[testBuffer.Length];
int totalBytesRead = 0;
while (totalBytesRead != receiveBuffer.Length)
{
int bytesRead = await serverStream.ReadAsync(receiveBuffer.AsMemory(totalBytesRead, Math.Min(receiveBuffer.Length - totalBytesRead, readSize)));
if (bytesRead == 0)
{
break;
}
totalBytesRead += bytesRead;
}
Assert.True(receiveBuffer.AsSpan().SequenceEqual(testBuffer));
});
}
public static IEnumerable<object[]> ReadWrite_Random_Success_Data()
{
IEnumerable<int> sizes = Enumerable.Range(1, 8).Append(2048).Append(8192);
return
from readSize in sizes
from writeSize in sizes
select new object[] { readSize, writeSize };
}
[ActiveIssue("https://github.com/dotnet/runtime/issues/32049")]
[Fact]
public async Task Read_StreamAborted_Throws()
{
const int ExpectedErrorCode = 1234;
await Task.Run(async () =>
{
using QuicListener listener = CreateQuicListener();
ValueTask<QuicConnection> serverConnectionTask = listener.AcceptConnectionAsync();
using QuicConnection clientConnection = CreateQuicConnection(listener.ListenEndPoint);
await clientConnection.ConnectAsync();
using QuicConnection serverConnection = await serverConnectionTask;
await using QuicStream clientStream = clientConnection.OpenBidirectionalStream();
await clientStream.WriteAsync(new byte[1]);
await using QuicStream serverStream = await serverConnection.AcceptStreamAsync();
await serverStream.ReadAsync(new byte[1]);
clientStream.AbortWrite(ExpectedErrorCode);
byte[] buffer = new byte[100];
QuicStreamAbortedException ex = await Assert.ThrowsAsync<QuicStreamAbortedException>(() => serverStream.ReadAsync(buffer).AsTask());
Assert.Equal(ExpectedErrorCode, ex.ErrorCode);
}).TimeoutAfter(millisecondsTimeout: 5_000);
}
[ActiveIssue("https://github.com/dotnet/runtime/issues/32050")]
[Fact]
public async Task Read_ConnectionAborted_Throws()
{
const int ExpectedErrorCode = 1234;
await Task.Run(async () =>
{
using QuicListener listener = CreateQuicListener();
ValueTask<QuicConnection> serverConnectionTask = listener.AcceptConnectionAsync();
using QuicConnection clientConnection = CreateQuicConnection(listener.ListenEndPoint);
await clientConnection.ConnectAsync();
using QuicConnection serverConnection = await serverConnectionTask;
await using QuicStream clientStream = clientConnection.OpenBidirectionalStream();
await clientStream.WriteAsync(new byte[1]);
await using QuicStream serverStream = await serverConnection.AcceptStreamAsync();
await serverStream.ReadAsync(new byte[1]);
await clientConnection.CloseAsync(ExpectedErrorCode);
byte[] buffer = new byte[100];
QuicConnectionAbortedException ex = await Assert.ThrowsAsync<QuicConnectionAbortedException>(() => serverStream.ReadAsync(buffer).AsTask());
Assert.Equal(ExpectedErrorCode, ex.ErrorCode);
}).TimeoutAfter(millisecondsTimeout: 5_000);
}
}
}
......@@ -13,5 +13,7 @@
<Compile Include="$(CommonTestPath)\System\Threading\Tasks\TaskTimeoutExtensions.cs">
<Link>Common\System\Threading\Tasks\TaskTimeoutExtensions.cs</Link>
</Compile>
<Compile Include="QuicListenerTests.cs" />
<Compile Include="QuicStreamTests.cs" />
</ItemGroup>
</Project>
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册