未验证 提交 702c1ea4 编写于 作者: R Radek Zikmund 提交者: GitHub

Small MsQuicStream refactorings (#70433)

* Inline state transition helpers

Fixes #55437

* Add high level comments for HandleEventReceive and ReadAsync
上级 cd60639f
......@@ -292,52 +292,6 @@ private async ValueTask WriteAsync<TBuffer>(Action<State, TBuffer> stateSetup, T
{
ThrowIfDisposed();
using CancellationTokenRegistration registration = SetupWriteStartState(isEmpty, cancellationToken);
await WriteAsyncCore<TBuffer>(stateSetup, buffer, isEmpty, endStream).ConfigureAwait(false);
CleanupWriteCompletedState();
}
private unsafe ValueTask WriteAsyncCore<TBuffer>(Action<State, TBuffer> stateSetup, TBuffer buffer, bool isEmpty, bool endStream)
{
if (isEmpty)
{
if (endStream)
{
// Start graceful shutdown sequence if passed in the fin flag and there is an empty buffer.
StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.GRACEFUL, errorCode: 0);
}
return default;
}
stateSetup(_state, buffer);
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
int status = MsQuicApi.Api.ApiTable->StreamSend(
_state.Handle.QuicHandle,
_state.SendBuffers.Buffers,
(uint)_state.SendBuffers.Count,
endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE,
(void*)IntPtr.Zero);
if (StatusFailed(status))
{
CleanupWriteFailedState();
CleanupSendState(_state);
if (status == QUIC_STATUS_ABORTED)
{
throw ThrowHelper.GetConnectionAbortedException(_state.ConnectionState.AbortErrorCode);
}
ThrowIfFailure(status, "Could not send data to peer.");
}
return _state.SendResettableCompletionSource.GetTypelessValueTask();
}
private CancellationTokenRegistration SetupWriteStartState(bool emptyBuffer, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
lock (_state)
......@@ -369,7 +323,7 @@ private CancellationTokenRegistration SetupWriteStartState(bool emptyBuffer, Can
}
// if token was already cancelled, this would execute synchronously
CancellationTokenRegistration registration = cancellationToken.UnsafeRegister(static (s, token) =>
using CancellationTokenRegistration registration = cancellationToken.UnsafeRegister(static (s, token) =>
{
var state = (State)s!;
bool shouldComplete = false;
......@@ -417,14 +371,11 @@ private CancellationTokenRegistration SetupWriteStartState(bool emptyBuffer, Can
// Change the state in the same lock where we check for final states to prevent coming back from Aborted/ConnectionClosed.
Debug.Assert(_state.SendState != SendState.Pending);
_state.SendState = emptyBuffer ? SendState.Finished : SendState.Pending;
_state.SendState = isEmpty ? SendState.Finished : SendState.Pending;
}
return registration;
}
await WriteAsyncCore<TBuffer>(stateSetup, buffer, isEmpty, endStream).ConfigureAwait(false);
private void CleanupWriteCompletedState()
{
lock (_state)
{
if (_state.SendState == SendState.Finished)
......@@ -434,19 +385,57 @@ private void CleanupWriteCompletedState()
}
}
private void CleanupWriteFailedState()
private unsafe ValueTask WriteAsyncCore<TBuffer>(Action<State, TBuffer> stateSetup, TBuffer buffer, bool isEmpty, bool endStream)
{
lock (_state)
if (isEmpty)
{
if (_state.SendState == SendState.Pending)
if (endStream)
{
_state.SendState = SendState.Finished;
// Start graceful shutdown sequence if passed in the fin flag and there is an empty buffer.
StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.GRACEFUL, errorCode: 0);
}
return default;
}
stateSetup(_state, buffer);
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
int status = MsQuicApi.Api.ApiTable->StreamSend(
_state.Handle.QuicHandle,
_state.SendBuffers.Buffers,
(uint)_state.SendBuffers.Count,
endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE,
(void*)IntPtr.Zero);
if (StatusFailed(status))
{
lock (_state)
{
if (_state.SendState == SendState.Pending)
{
_state.SendState = SendState.Finished;
}
}
CleanupSendState(_state);
if (status == QUIC_STATUS_ABORTED)
{
throw ThrowHelper.GetConnectionAbortedException(_state.ConnectionState.AbortErrorCode);
}
ThrowIfFailure(status, "Could not send data to peer.");
}
return _state.SendResettableCompletionSource.GetTypelessValueTask();
}
internal override async ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
{
//
// If MsQuic indicated that some data were received (QUIC_STREAM_EVENT_RECEIVE), we use it to complete the request
// synchronously. Otherwise we setup the request to be completed by the HandleEventReceive handler.
//
ThrowIfDisposed();
if (_state.ReadState == ReadState.Closed)
......@@ -1009,6 +998,13 @@ private static unsafe int NativeCallback(QUIC_HANDLE* stream, void* context, QUI
private static unsafe int HandleEventReceive(State state, ref QUIC_STREAM_EVENT streamEvent)
{
//
// Handle MsQuic QUIC_STREAM_EVENT_RECEIVE event
//
// If there is a pending ReadAsync call, then we complete it. Otherwise we keep a pointer to the received data
// and use it to complete the next ReadAsync operation synchronously.
//
ref var receiveEvent = ref streamEvent.RECEIVE;
if (NetEventSource.Log.IsEnabled())
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册