提交 1cc5f8c2 编写于 作者: S Stephen Toub 提交者: GitHub

Use new threading-related APIs in Channels (dotnet/corefx#33080)

- Avoid ThreadPool-related allocations via IThreadPoolWorkItem.  We already had a fairly low allocation profile on most channels, thanks to an IValueTaskSource implementation.  This extends that implementation with an IThreadPoolWorkItem implementation so that when we do need to queue to the pool (e.g. to support RunContinuationsAsynchronously, on writes on the bounded queue, etc.), we can do so without incurring additional allocation.
- Avoid ExecutionContext costs with CancellationToken.UnsafeRegister.  Minor savings when a cancelable token is provided; we don't need to flow context as all we're doing is completing another object.

Commit migrated from https://github.com/dotnet/corefx/commit/89ab1e83a7e00d869e1580151e24f01226acaf3f
上级 30471ffb
......@@ -4,7 +4,7 @@
<PackageConfigurations>
netstandard1.3;
netstandard;
netcoreapp2.1;
netcoreapp;
</PackageConfigurations>
<BuildConfigurations>
$(PackageConfigurations);
......
......@@ -3,12 +3,14 @@
<ProjectGuid>{AAADA5D3-CF64-4E9D-943C-EFDC006D6366}</ProjectGuid>
<RootNamespace>System.Threading.Channels</RootNamespace>
<DocumentationFile>$(OutputPath)$(MSBuildProjectName).xml</DocumentationFile>
<Configurations>netcoreapp-Debug;netcoreapp-Release;netcoreapp2.1-Debug;netcoreapp2.1-Release;netstandard-Debug;netstandard-Release;netstandard1.3-Debug;netstandard1.3-Release</Configurations>
<Configurations>netcoreapp-Debug;netcoreapp-Release;netstandard-Debug;netstandard-Release;netstandard1.3-Debug;netstandard1.3-Release</Configurations>
</PropertyGroup>
<ItemGroup>
<Compile Include="System\VoidResult.cs" />
<Compile Include="System\Collections\Generic\Deque.cs" />
<Compile Include="System\Threading\Channels\AsyncOperation.cs" />
<Compile Include="System\Threading\Channels\AsyncOperation.netcoreapp.cs" Condition="'$(TargetGroup)' == 'netcoreapp'" />
<Compile Include="System\Threading\Channels\AsyncOperation.netstandard.cs" Condition="'$(TargetGroup)' != 'netcoreapp'" />
<Compile Include="System\Threading\Channels\BoundedChannel.cs" />
<Compile Include="System\Threading\Channels\BoundedChannelFullMode.cs" />
<Compile Include="System\Threading\Channels\Channel.cs" />
......@@ -37,6 +39,7 @@
<Reference Include="System.Runtime" />
<Reference Include="System.Runtime.Extensions" />
<Reference Include="System.Threading" />
<Reference Include="System.Threading.ThreadPool" />
<Reference Include="System.Threading.Tasks" />
<Reference Include="System.Threading.Tasks.Extensions" />
</ItemGroup>
......
......@@ -12,9 +12,12 @@ namespace System.Threading.Channels
internal abstract class AsyncOperation
{
/// <summary>Sentinel object used in a field to indicate the operation is available for use.</summary>
protected static readonly Action<object> s_availableSentinel = new Action<object>(s => Debug.Fail($"{nameof(AsyncOperation)}.{nameof(s_availableSentinel)} invoked with {s}."));
protected static readonly Action<object> s_availableSentinel = AvailableSentinel; // named method to help with debugging
private static void AvailableSentinel(object s) => Debug.Fail($"{nameof(AsyncOperation)}.{nameof(AvailableSentinel)} invoked with {s}");
/// <summary>Sentinel object used in a field to indicate the operation has completed.</summary>
protected static readonly Action<object> s_completedSentinel = new Action<object>(s => Debug.Fail($"{nameof(AsyncOperation)}.{nameof(s_completedSentinel)} invoked with {s}"));
protected static readonly Action<object> s_completedSentinel = CompletedSentinel; // named method to help with debugging
private static void CompletedSentinel(object s) => Debug.Fail($"{nameof(AsyncOperation)}.{nameof(CompletedSentinel)} invoked with {s}");
/// <summary>Throws an exception indicating that the operation's result was accessed before the operation completed.</summary>
protected static void ThrowIncompleteOperationException() =>
......@@ -31,7 +34,7 @@ internal abstract class AsyncOperation
/// <summary>The representation of an asynchronous operation that has a result value.</summary>
/// <typeparam name="TResult">Specifies the type of the result. May be <see cref="VoidResult"/>.</typeparam>
internal class AsyncOperation<TResult> : AsyncOperation, IValueTaskSource, IValueTaskSource<TResult>
internal partial class AsyncOperation<TResult> : AsyncOperation, IValueTaskSource, IValueTaskSource<TResult>
{
/// <summary>Registration with a provided cancellation token.</summary>
private readonly CancellationTokenRegistration _registration;
......@@ -85,7 +88,7 @@ public AsyncOperation(bool runContinuationsAsynchronously, CancellationToken can
{
Debug.Assert(!_pooled, "Cancelable operations can't be pooled");
CancellationToken = cancellationToken;
_registration = cancellationToken.Register(s =>
_registration = UnsafeRegister(cancellationToken, s =>
{
var thisRef = (AsyncOperation<TResult>)s;
thisRef.TrySetCanceled(thisRef.CancellationToken);
......@@ -106,17 +109,16 @@ public AsyncOperation(bool runContinuationsAsynchronously, CancellationToken can
/// <param name="token">The token that must match <see cref="_currentId"/>.</param>
public ValueTaskSourceStatus GetStatus(short token)
{
if (_currentId == token)
if (_currentId != token)
{
return
!IsCompleted ? ValueTaskSourceStatus.Pending :
_error == null ? ValueTaskSourceStatus.Succeeded :
_error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled :
ValueTaskSourceStatus.Faulted;
ThrowIncorrectCurrentIdException();
}
ThrowIncorrectCurrentIdException();
return default; // just to satisfy compiler
return
!IsCompleted ? ValueTaskSourceStatus.Pending :
_error == null ? ValueTaskSourceStatus.Succeeded :
_error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled :
ValueTaskSourceStatus.Faulted;
}
/// <summary>Gets whether the operation has completed.</summary>
......@@ -274,8 +276,14 @@ public void OnCompleted(Action<object> continuation, object state, short token,
ThrowMultipleContinuations();
}
// Queue the continuation.
if (sc != null)
// Queue the continuation. We always queue here, even if !RunContinuationsAsynchronously, in order
// to avoid stack diving; this path happens in the rare race when we're setting up to await and the
// object is completed after the awaiter.IsCompleted but before the awaiter.OnCompleted.
if (_schedulingContext == null)
{
QueueUserWorkItem(continuation, state);
}
else if (sc != null)
{
sc.Post(s =>
{
......@@ -285,7 +293,8 @@ public void OnCompleted(Action<object> continuation, object state, short token,
}
else
{
Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts ?? TaskScheduler.Default);
Debug.Assert(ts != null);
Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
}
}
}
......@@ -364,70 +373,68 @@ private void SignalCompletion()
{
if (_continuation != null || Interlocked.CompareExchange(ref _continuation, s_completedSentinel, null) != null)
{
ExecutionContext ec = _executionContext;
if (ec != null)
Debug.Assert(_continuation != s_completedSentinel, $"The continuation was the completion sentinel.");
Debug.Assert(_continuation != s_availableSentinel, $"The continuation was the available sentinel.");
if (_schedulingContext == null)
{
ExecutionContext.Run(ec, s => ((AsyncOperation<TResult>)s).SignalCompletionCore(), this);
// There's no captured scheduling context. If we're forced to run continuations asynchronously, queue it.
// Otherwise fall through to invoke it synchronously.
if (_runContinuationsAsynchronously)
{
UnsafeQueueSetCompletionAndInvokeContinuation();
return;
}
}
else if (_schedulingContext is SynchronizationContext sc)
{
// There's a captured synchronization context. If we're forced to run continuations asynchronously,
// or if there's a current synchronization context that's not the one we're targeting, queue it.
// Otherwise fall through to invoke it synchronously.
if (_runContinuationsAsynchronously || sc != SynchronizationContext.Current)
{
sc.Post(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this);
return;
}
}
else
{
SignalCompletionCore();
// There's a captured TaskScheduler. If we're forced to run continuations asynchronously,
// or if there's a current scheduler that's not the one we're targeting, queue it.
// Otherwise fall through to invoke it synchronously.
TaskScheduler ts = (TaskScheduler)_schedulingContext;
Debug.Assert(ts != null, "Expected a TaskScheduler");
if (_runContinuationsAsynchronously || ts != TaskScheduler.Current)
{
Task.Factory.StartNew(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this,
CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
return;
}
}
// Invoke the continuation synchronously.
SetCompletionAndInvokeContinuation();
}
}
/// <summary>Invokes the registered continuation; separated out of SignalCompletion for convenience so that it may be invoked on multiple code paths.</summary>
private void SignalCompletionCore()
private void SetCompletionAndInvokeContinuation()
{
Debug.Assert(_continuation != s_completedSentinel, $"The continuation was the completion sentinel.");
Debug.Assert(_continuation != s_availableSentinel, $"The continuation was the available sentinel.");
if (_schedulingContext == null)
{
// There's no captured scheduling context. If we're forced to run continuations asynchronously, queue it.
// Otherwise fall through to invoke it synchronously.
if (_runContinuationsAsynchronously)
{
Task.Factory.StartNew(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this,
CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
return;
}
}
else if (_schedulingContext is SynchronizationContext sc)
if (_executionContext == null)
{
// There's a captured synchronization context. If we're forced to run continuations asynchronously,
// or if there's a current synchronization context that's not the one we're targeting, queue it.
// Otherwise fall through to invoke it synchronously.
if (_runContinuationsAsynchronously || sc != SynchronizationContext.Current)
{
sc.Post(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this);
return;
}
Action<object> c = _continuation;
_continuation = s_completedSentinel;
c(_continuationState);
}
else
{
// There's a captured TaskScheduler. If we're forced to run continuations asynchronously,
// or if there's a current scheduler that's not the one we're targeting, queue it.
// Otherwise fall through to invoke it synchronously.
TaskScheduler ts = (TaskScheduler)_schedulingContext;
Debug.Assert(ts != null, "Expected a TaskScheduler");
if (_runContinuationsAsynchronously || ts != TaskScheduler.Current)
ExecutionContext.Run(_executionContext, s =>
{
Task.Factory.StartNew(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this,
CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
return;
}
var thisRef = (AsyncOperation<TResult>)s;
Action<object> c = thisRef._continuation;
thisRef._continuation = s_completedSentinel;
c(thisRef._continuationState);
}, this);
}
// Invoke the continuation synchronously.
SetCompletionAndInvokeContinuation();
}
private void SetCompletionAndInvokeContinuation()
{
Action<object> c = _continuation;
_continuation = s_completedSentinel;
c(_continuationState);
}
}
......
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
namespace System.Threading.Channels
{
internal partial class AsyncOperation<TResult> : IThreadPoolWorkItem
{
void IThreadPoolWorkItem.Execute() => SetCompletionAndInvokeContinuation();
private void UnsafeQueueSetCompletionAndInvokeContinuation() =>
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
private static void QueueUserWorkItem(Action<object> action, object state) =>
ThreadPool.QueueUserWorkItem(action, state, preferLocal: false);
private static CancellationTokenRegistration UnsafeRegister(CancellationToken cancellationToken, Action<object> action, object state) =>
cancellationToken.UnsafeRegister(action, state);
}
}
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System.Threading.Tasks;
namespace System.Threading.Channels
{
internal partial class AsyncOperation<TResult>
{
private void UnsafeQueueSetCompletionAndInvokeContinuation() =>
Task.Factory.StartNew(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this,
CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
private static void QueueUserWorkItem(Action<object> action, object state) =>
Task.Factory.StartNew(action, state,
CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
private static CancellationTokenRegistration UnsafeRegister(CancellationToken cancellationToken, Action<object> action, object state) =>
cancellationToken.Register(action, state);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册