未验证 提交 15033649 编写于 作者: J Justin Kotalik 提交者: GitHub

Close accept loop when closing connection for Quic (#44885)

上级 31ffad2c
...@@ -161,21 +161,21 @@ private uint HandleEventShutdownInitiatedByTransport(ref ConnectionEvent connect ...@@ -161,21 +161,21 @@ private uint HandleEventShutdownInitiatedByTransport(ref ConnectionEvent connect
_connectTcs.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(ex)); _connectTcs.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(ex));
} }
_acceptQueue.Writer.Complete();
return MsQuicStatusCodes.Success; return MsQuicStatusCodes.Success;
} }
private uint HandleEventShutdownInitiatedByPeer(ref ConnectionEvent connectionEvent) private uint HandleEventShutdownInitiatedByPeer(ref ConnectionEvent connectionEvent)
{ {
_abortErrorCode = connectionEvent.Data.ShutdownInitiatedByPeer.ErrorCode; _abortErrorCode = connectionEvent.Data.ShutdownInitiatedByPeer.ErrorCode;
_acceptQueue.Writer.Complete();
return MsQuicStatusCodes.Success; return MsQuicStatusCodes.Success;
} }
private uint HandleEventShutdownComplete(ref ConnectionEvent connectionEvent) private uint HandleEventShutdownComplete(ref ConnectionEvent connectionEvent)
{ {
_shutdownTcs.SetResult(MsQuicStatusCodes.Success); _shutdownTcs.SetResult(MsQuicStatusCodes.Success);
// Stop accepting new streams.
_acceptQueue?.Writer.Complete();
return MsQuicStatusCodes.Success; return MsQuicStatusCodes.Success;
} }
...@@ -291,7 +291,7 @@ internal override ValueTask ConnectAsync(CancellationToken cancellationToken = d ...@@ -291,7 +291,7 @@ internal override ValueTask ConnectAsync(CancellationToken cancellationToken = d
private void SetCallbackHandler() private void SetCallbackHandler()
{ {
Debug.Assert(!_handle.IsAllocated); Debug.Assert(!_handle.IsAllocated, "callback handler allocated already");
_handle = GCHandle.Alloc(this); _handle = GCHandle.Alloc(this);
MsQuicApi.Api.SetCallbackHandlerDelegate( MsQuicApi.Api.SetCallbackHandlerDelegate(
...@@ -310,8 +310,6 @@ private void SetCallbackHandler() ...@@ -310,8 +310,6 @@ private void SetCallbackHandler()
ErrorCode); ErrorCode);
QuicExceptionHelpers.ThrowIfFailed(status, "Failed to shutdown connection."); QuicExceptionHelpers.ThrowIfFailed(status, "Failed to shutdown connection.");
Debug.Assert(_shutdownTcs.Task.IsCompleted == false);
return new ValueTask(_shutdownTcs.Task); return new ValueTask(_shutdownTcs.Task);
} }
......
...@@ -34,7 +34,7 @@ internal sealed class MsQuicListener : QuicListenerProvider, IDisposable ...@@ -34,7 +34,7 @@ internal sealed class MsQuicListener : QuicListenerProvider, IDisposable
private QuicListenerOptions _options; private QuicListenerOptions _options;
private volatile bool _disposed; private volatile bool _disposed;
private IPEndPoint _listenEndPoint; private IPEndPoint _listenEndPoint;
private bool _started;
private readonly Channel<MsQuicConnection> _acceptConnectionQueue; private readonly Channel<MsQuicConnection> _acceptConnectionQueue;
internal MsQuicListener(QuicListenerOptions options) internal MsQuicListener(QuicListenerOptions options)
...@@ -120,6 +120,13 @@ internal override void Start() ...@@ -120,6 +120,13 @@ internal override void Start()
{ {
ThrowIfDisposed(); ThrowIfDisposed();
// protect against double starts.
if (_started)
{
throw new QuicException("Cannot start Listener multiple times");
}
_started = true;
SetCallbackHandler(); SetCallbackHandler();
SOCKADDR_INET address = MsQuicAddressHelpers.IPEndPointToINet(_listenEndPoint); SOCKADDR_INET address = MsQuicAddressHelpers.IPEndPointToINet(_listenEndPoint);
...@@ -202,7 +209,7 @@ private void StopAcceptingConnections() ...@@ -202,7 +209,7 @@ private void StopAcceptingConnections()
internal void SetCallbackHandler() internal void SetCallbackHandler()
{ {
Debug.Assert(!_handle.IsAllocated); Debug.Assert(!_handle.IsAllocated, "listener allocated");
_handle = GCHandle.Alloc(this); _handle = GCHandle.Alloc(this);
MsQuicApi.Api.SetCallbackHandlerDelegate( MsQuicApi.Api.SetCallbackHandlerDelegate(
......
...@@ -71,7 +71,7 @@ internal sealed class MsQuicStream : QuicStreamProvider ...@@ -71,7 +71,7 @@ internal sealed class MsQuicStream : QuicStreamProvider
// Creates a new MsQuicStream // Creates a new MsQuicStream
internal MsQuicStream(MsQuicConnection connection, QUIC_STREAM_OPEN_FLAG flags, IntPtr nativeObjPtr, bool inbound) internal MsQuicStream(MsQuicConnection connection, QUIC_STREAM_OPEN_FLAG flags, IntPtr nativeObjPtr, bool inbound)
{ {
Debug.Assert(connection != null); Debug.Assert(connection != null, "Connection null");
_ptr = nativeObjPtr; _ptr = nativeObjPtr;
...@@ -936,7 +936,7 @@ private void SetCallbackHandler() ...@@ -936,7 +936,7 @@ private void SetCallbackHandler()
/// </summary> /// </summary>
private void StartLocalStream() private void StartLocalStream()
{ {
Debug.Assert(!_started); Debug.Assert(!_started, "start local stream");
uint status = MsQuicApi.Api.StreamStartDelegate( uint status = MsQuicApi.Api.StreamStartDelegate(
_ptr, _ptr,
(uint)QUIC_STREAM_START_FLAG.ASYNC); (uint)QUIC_STREAM_START_FLAG.ASYNC);
......
...@@ -177,6 +177,23 @@ public async Task CallDifferentWriteMethodsWorks() ...@@ -177,6 +177,23 @@ public async Task CallDifferentWriteMethodsWorks()
Assert.Equal(24, res); Assert.Equal(24, res);
} }
[Fact]
public async Task CloseAsync_ByServer_AcceptThrows()
{
await RunClientServer(
clientConnection =>
{
return Task.CompletedTask;
},
async serverConnection =>
{
var acceptTask = serverConnection.AcceptStreamAsync();
await serverConnection.CloseAsync(errorCode: 0);
// make sure
await Assert.ThrowsAsync<QuicOperationAbortedException>(() => acceptTask.AsTask());
});
}
private static ReadOnlySequence<byte> CreateReadOnlySequenceFromBytes(byte[] data) private static ReadOnlySequence<byte> CreateReadOnlySequenceFromBytes(byte[] data)
{ {
List<byte[]> segments = new List<byte[]> List<byte[]> segments = new List<byte[]>
......
...@@ -21,10 +21,11 @@ public async Task Listener_Backlog_Success() ...@@ -21,10 +21,11 @@ public async Task Listener_Backlog_Success()
using QuicListener listener = CreateQuicListener(); using QuicListener listener = CreateQuicListener();
using QuicConnection clientConnection = CreateQuicConnection(listener.ListenEndPoint); using QuicConnection clientConnection = CreateQuicConnection(listener.ListenEndPoint);
await clientConnection.ConnectAsync(); var clientStreamTask = clientConnection.ConnectAsync();
using QuicConnection serverConnection = await listener.AcceptConnectionAsync(); using QuicConnection serverConnection = await listener.AcceptConnectionAsync();
}).TimeoutAfter(millisecondsTimeout: 5_000); await clientStreamTask;
}).TimeoutAfter(millisecondsTimeout: 6_000);
} }
} }
......
...@@ -305,8 +305,6 @@ public async Task LargeDataSentAndReceived() ...@@ -305,8 +305,6 @@ public async Task LargeDataSentAndReceived()
} }
} }
[Fact] [Fact]
public async Task TestStreams() public async Task TestStreams()
{ {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册