diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index a9bf3de2cc56a6e7d975b2d23d4e5ef4346962d8..3a9066db918a0f455011c536d39d6cd6fd3f3f29 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -113,9 +113,10 @@ private abstract class AsyncOperation : IThreadPoolWorkItem private enum State { Waiting = 0, - Running = 1, - Complete = 2, - Cancelled = 3 + Running, + RunningWithPendingCancellation, + Complete, + Canceled } private int _state; // Actually AsyncOperation.State. @@ -149,92 +150,103 @@ public void Reset() #endif } - public bool TryComplete(SocketAsyncContext context) + public OperationResult TryComplete(SocketAsyncContext context) { TraceWithContext(context, "Enter"); - bool result = DoTryComplete(context); - - TraceWithContext(context, $"Exit, result={result}"); + // Set state to Running, unless we've been canceled + int oldState = Interlocked.CompareExchange(ref _state, (int)State.Running, (int)State.Waiting); + if (oldState == (int)State.Canceled) + { + TraceWithContext(context, "Exit, Previously canceled"); + return OperationResult.Cancelled; + } - return result; - } + Debug.Assert(oldState == (int)State.Waiting, $"Unexpected operation state: {(State)oldState}"); - public bool TrySetRunning() - { - State oldState = (State)Interlocked.CompareExchange(ref _state, (int)State.Running, (int)State.Waiting); - if (oldState == State.Cancelled) + // Try to perform the IO + if (DoTryComplete(context)) { - // This operation has already been cancelled, and had its completion processed. - // Simply return false to indicate no further processing is needed. - return false; + Debug.Assert((State)Volatile.Read(ref _state) is State.Running or State.RunningWithPendingCancellation, "Unexpected operation state"); + + Volatile.Write(ref _state, (int)State.Complete); + + TraceWithContext(context, "Exit, Completed"); + return OperationResult.Completed; } - Debug.Assert(oldState == (int)State.Waiting); - return true; - } + // Set state back to Waiting, unless we were canceled, in which case we have to process cancellation now + int newState; + while (true) + { + int state = Volatile.Read(ref _state); + Debug.Assert(state is (int)State.Running or (int)State.RunningWithPendingCancellation, $"Unexpected operation state: {(State)state}"); - public void SetComplete() - { - Debug.Assert(Volatile.Read(ref _state) == (int)State.Running); + newState = (state == (int)State.Running ? (int)State.Waiting : (int)State.Canceled); + if (state == Interlocked.CompareExchange(ref _state, newState, state)) + { + break; + } - Volatile.Write(ref _state, (int)State.Complete); - } + // Race to update the state. Loop and try again. + } - public void SetWaiting() - { - Debug.Assert(Volatile.Read(ref _state) == (int)State.Running); + if (newState == (int)State.Canceled) + { + ProcessCancellation(); + TraceWithContext(context, "Exit, Newly cancelled"); + return OperationResult.Cancelled; + } - Volatile.Write(ref _state, (int)State.Waiting); + TraceWithContext(context, "Exit, Pending"); + return OperationResult.Pending; } public bool TryCancel() { Trace("Enter"); - // We're already canceling, so we don't need to still be hooked up to listen to cancellation. - // The cancellation request could also be caused by something other than the token, so it's - // important we clean it up, regardless. + // Note we could be cancelling because of socket close. Regardless, we don't need the registration anymore. CancellationRegistration.Dispose(); - // Try to transition from Waiting to Cancelled - SpinWait spinWait = default; - bool keepWaiting = true; - while (keepWaiting) + int newState; + while (true) { - int state = Interlocked.CompareExchange(ref _state, (int)State.Cancelled, (int)State.Waiting); - switch ((State)state) + int state = Volatile.Read(ref _state); + if (state is (int)State.Complete or (int)State.Canceled or (int)State.RunningWithPendingCancellation) { - case State.Running: - // A completion attempt is in progress. Keep busy-waiting. - Trace("Busy wait"); - spinWait.SpinOnce(); - break; + return false; + } - case State.Complete: - // A completion attempt succeeded. Consider this operation as having completed within the timeout. - Trace("Exit, previously completed"); - return false; + newState = (state == (int)State.Waiting ? (int)State.Canceled : (int)State.RunningWithPendingCancellation); + if (state == Interlocked.CompareExchange(ref _state, newState, state)) + { + break; + } - case State.Waiting: - // This operation was successfully cancelled. - // Break out of the loop to handle the cancellation - keepWaiting = false; - break; + // Race to update the state. Loop and try again. + } - case State.Cancelled: - // Someone else cancelled the operation. - // The previous canceller will have fired the completion, etc. - Trace("Exit, previously cancelled"); - return false; - } + if (newState == (int)State.RunningWithPendingCancellation) + { + // TryComplete will either succeed, or it will see the pending cancellation and deal with it. + return false; } - Trace("Cancelled, processing completion"); + ProcessCancellation(); - // The operation successfully cancelled. - // It's our responsibility to set the error code and queue the completion. - DoAbort(); + // Note, we leave the operation in the OperationQueue. + // When we get around to processing it, we'll see it's cancelled and skip it. + return true; + } + + public void ProcessCancellation() + { + Trace("Enter"); + + Debug.Assert(_state == (int)State.Canceled); + + ErrorCode = SocketError.OperationAborted; ManualResetEventSlim? e = Event; if (e != null) @@ -252,12 +264,6 @@ public bool TryCancel() // to do further processing on the item that's still in the list. ThreadPool.UnsafeQueueUserWorkItem(o => ((AsyncOperation)o!).InvokeCallback(allowPooling: false), this); } - - Trace("Exit"); - - // Note, we leave the operation in the OperationQueue. - // When we get around to processing it, we'll see it's cancelled and skip it. - return true; } public void Dispatch() @@ -306,12 +312,9 @@ void IThreadPoolWorkItem.Execute() // Called when op is not in the queue yet, so can't be otherwise executing public void DoAbort() { - Abort(); ErrorCode = SocketError.OperationAborted; } - protected abstract void Abort(); - protected abstract bool DoTryComplete(SocketAsyncContext context); public abstract void InvokeCallback(bool allowPooling); @@ -354,8 +357,6 @@ private abstract class SendOperation : WriteOperation public SendOperation(SocketAsyncContext context) : base(context) { } - protected sealed override void Abort() { } - public Action? Callback { get; set; } public override void InvokeCallback(bool allowPooling) => @@ -442,8 +443,6 @@ private abstract class ReceiveOperation : ReadOperation public ReceiveOperation(SocketAsyncContext context) : base(context) { } - protected sealed override void Abort() { } - public Action? Callback { get; set; } public override void InvokeCallback(bool allowPooling) => @@ -554,8 +553,6 @@ private sealed class ReceiveMessageFromOperation : ReadOperation public ReceiveMessageFromOperation(SocketAsyncContext context) : base(context) { } - protected sealed override void Abort() { } - public Action? Callback { get; set; } protected override bool DoTryComplete(SocketAsyncContext context) => @@ -579,8 +576,6 @@ private sealed unsafe class BufferPtrReceiveMessageFromOperation : ReadOperation public BufferPtrReceiveMessageFromOperation(SocketAsyncContext context) : base(context) { } - protected sealed override void Abort() { } - public Action? Callback { get; set; } protected override bool DoTryComplete(SocketAsyncContext context) => @@ -598,9 +593,6 @@ private sealed class AcceptOperation : ReadOperation public Action? Callback { get; set; } - protected override void Abort() => - AcceptedFileDescriptor = (IntPtr)(-1); - protected override bool DoTryComplete(SocketAsyncContext context) { bool completed = SocketPal.TryCompleteAccept(context._socket, SocketAddress!, ref SocketAddressLen, out AcceptedFileDescriptor, out ErrorCode); @@ -631,8 +623,6 @@ private sealed class ConnectOperation : WriteOperation public Action? Callback { get; set; } - protected override void Abort() { } - protected override bool DoTryComplete(SocketAsyncContext context) { bool result = SocketPal.TryCompleteConnect(context._socket, SocketAddressLen, out ErrorCode); @@ -653,8 +643,6 @@ private sealed class SendFileOperation : WriteOperation public SendFileOperation(SocketAsyncContext context) : base(context) { } - protected override void Abort() { } - public Action? Callback { get; set; } public override void InvokeCallback(bool allowPooling) => @@ -694,6 +682,13 @@ public void Dispose() } } + public enum OperationResult + { + Pending = 0, + Completed = 1, + Cancelled = 2 + } + private struct OperationQueue where TOperation : AsyncOperation { @@ -864,7 +859,7 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation } // Retry the operation. - if (operation.TryComplete(context)) + if (operation.TryComplete(context) != OperationResult.Pending) { Trace(context, $"Leave, retry succeeded"); return false; @@ -880,7 +875,7 @@ static void HandleFailedRegistration(SocketAsyncContext context, TOperation oper { // Because the other end close, we expect the operation to complete when we retry it. // If it doesn't, we fall through and throw an Exception. - if (operation.TryComplete(context)) + if (operation.TryComplete(context) != OperationResult.Pending) { return; } @@ -979,13 +974,6 @@ internal void ProcessAsyncOperation(TOperation op) } } - public enum OperationResult - { - Pending = 0, - Completed = 1, - Cancelled = 2 - } - public OperationResult ProcessQueuedOperation(TOperation op) { SocketAsyncContext context = op.AssociatedContext; @@ -1010,27 +998,15 @@ public OperationResult ProcessQueuedOperation(TOperation op) } } - bool wasCompleted = false; + OperationResult result; while (true) { - // Try to change the op state to Running. - // If this fails, it means the operation was previously cancelled, - // and we should just remove it from the queue without further processing. - if (!op.TrySetRunning()) - { - break; - } - - // Try to perform the IO - if (op.TryComplete(context)) + result = op.TryComplete(context); + if (result != OperationResult.Pending) { - op.SetComplete(); - wasCompleted = true; break; } - op.SetWaiting(); - // Check for retry and reset queue state. using (Lock()) @@ -1097,7 +1073,8 @@ public OperationResult ProcessQueuedOperation(TOperation op) nextOp?.Dispatch(); - return (wasCompleted ? OperationResult.Completed : OperationResult.Cancelled); + Debug.Assert(result != OperationResult.Pending); + return result; } public void CancelAndContinueProcessing(TOperation op) @@ -1360,9 +1337,9 @@ private void PerformSyncOperation(ref OperationQueue que e.Reset(); // We've been signalled to try to process the operation. - OperationQueue.OperationResult result = queue.ProcessQueuedOperation(operation); - if (result == OperationQueue.OperationResult.Completed || - result == OperationQueue.OperationResult.Cancelled) + OperationResult result = queue.ProcessQueuedOperation(operation); + if (result == OperationResult.Completed || + result == OperationResult.Cancelled) { break; }