未验证 提交 a7a2fd65 编写于 作者: K Koundinya Veluri 提交者: GitHub

Improve the rate of thread injection for blocking due to sync-over-async (#53471)

* Improve the rate of thread injection for blocking due to sync-over-async

Fixes https://github.com/dotnet/runtime/issues/52558
上级 27baae9f
......@@ -522,6 +522,15 @@ internal static void NotifyWorkItemProgress()
[MethodImpl(MethodImplOptions.InternalCall)]
private static extern void NotifyWorkItemProgressNative();
internal static bool NotifyThreadBlocked() =>
UsePortableThreadPool && PortableThreadPool.ThreadPoolInstance.NotifyThreadBlocked();
internal static void NotifyThreadUnblocked()
{
Debug.Assert(UsePortableThreadPool);
PortableThreadPool.ThreadPoolInstance.NotifyThreadUnblocked();
}
internal static object? GetOrCreateThreadLocalCompletionCountObject() =>
UsePortableThreadPool ? PortableThreadPool.ThreadPoolInstance.GetOrCreateThreadLocalCompletionCountObject() : null;
......
......@@ -2167,9 +2167,10 @@
<ItemGroup Condition="'$(FeaturePortableThreadPool)' == 'true'">
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\ThreadPool.Portable.cs" Condition="'$(FeatureCoreCLR)' != 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\ThreadPoolBoundHandle.PlatformNotSupported.cs" Condition="'$(FeatureCoreCLR)' != 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\NativeRuntimeEventSource.PortableThreadPool.cs" Condition="'$(FeatureCoreCLR)' != 'true' and '$(FeatureMono)' != 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\NativeRuntimeEventSource.PortableThreadPool.NativeSinks.cs" Condition="'$(FeatureCoreCLR)' == 'true' or '$(FeatureMono)' == 'true'"/>
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.Blocking.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.GateThread.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.HillClimbing.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.HillClimbing.Complex.cs" />
......
......@@ -66,7 +66,8 @@ public enum ThreadAdjustmentReasonMap : uint
ChangePoint,
Stabilizing,
Starvation,
ThreadTimedOut
ThreadTimedOut,
CooperativeBlocking,
}
#if !ES_BUILD_STANDALONE
......
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Diagnostics;
namespace System.Threading
{
internal sealed partial class PortableThreadPool
{
public short MinThreadsGoal
{
get
{
_threadAdjustmentLock.VerifyIsLocked();
return Math.Min(_separated.numThreadsGoal, TargetThreadsGoalForBlockingAdjustment);
}
}
private short TargetThreadsGoalForBlockingAdjustment
{
get
{
_threadAdjustmentLock.VerifyIsLocked();
return
_numBlockedThreads <= 0
? _minThreads
: (short)Math.Min((ushort)(_minThreads + _numBlockedThreads), (ushort)_maxThreads);
}
}
public bool NotifyThreadBlocked()
{
if (!BlockingConfig.IsCooperativeBlockingEnabled || !Thread.CurrentThread.IsThreadPoolThread)
{
return false;
}
bool wakeGateThread = false;
_threadAdjustmentLock.Acquire();
try
{
_numBlockedThreads++;
Debug.Assert(_numBlockedThreads > 0);
if (_pendingBlockingAdjustment != PendingBlockingAdjustment.WithDelayIfNecessary &&
_separated.numThreadsGoal < TargetThreadsGoalForBlockingAdjustment)
{
if (_pendingBlockingAdjustment == PendingBlockingAdjustment.None)
{
wakeGateThread = true;
}
_pendingBlockingAdjustment = PendingBlockingAdjustment.WithDelayIfNecessary;
}
}
finally
{
_threadAdjustmentLock.Release();
}
if (wakeGateThread)
{
GateThread.Wake(this);
}
return true;
}
public void NotifyThreadUnblocked()
{
Debug.Assert(BlockingConfig.IsCooperativeBlockingEnabled);
Debug.Assert(Thread.CurrentThread.IsThreadPoolThread);
bool wakeGateThread = false;
_threadAdjustmentLock.Acquire();
try
{
Debug.Assert(_numBlockedThreads > 0);
_numBlockedThreads--;
if (_pendingBlockingAdjustment != PendingBlockingAdjustment.Immediately &&
_numThreadsAddedDueToBlocking > 0 &&
_separated.numThreadsGoal > TargetThreadsGoalForBlockingAdjustment)
{
wakeGateThread = true;
_pendingBlockingAdjustment = PendingBlockingAdjustment.Immediately;
}
}
finally
{
_threadAdjustmentLock.Release();
}
if (wakeGateThread)
{
GateThread.Wake(this);
}
}
private uint PerformBlockingAdjustment(bool previousDelayElapsed)
{
uint nextDelayMs;
bool addWorker;
_threadAdjustmentLock.Acquire();
try
{
nextDelayMs = PerformBlockingAdjustment(previousDelayElapsed, out addWorker);
}
finally
{
_threadAdjustmentLock.Release();
}
if (addWorker)
{
WorkerThread.MaybeAddWorkingWorker(this);
}
return nextDelayMs;
}
private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWorker)
{
_threadAdjustmentLock.VerifyIsLocked();
Debug.Assert(_pendingBlockingAdjustment != PendingBlockingAdjustment.None);
_pendingBlockingAdjustment = PendingBlockingAdjustment.None;
addWorker = false;
short targetThreadsGoal = TargetThreadsGoalForBlockingAdjustment;
short numThreadsGoal = _separated.numThreadsGoal;
if (numThreadsGoal == targetThreadsGoal)
{
return 0;
}
if (numThreadsGoal > targetThreadsGoal)
{
// The goal is only decreased by how much it was increased in total due to blocking adjustments. This is to
// allow blocking adjustments to play well with starvation and hill climbing, either of which may increase the
// goal independently for other reasons, and blocking adjustments should not undo those changes.
if (_numThreadsAddedDueToBlocking <= 0)
{
return 0;
}
short toSubtract = Math.Min((short)(numThreadsGoal - targetThreadsGoal), _numThreadsAddedDueToBlocking);
_numThreadsAddedDueToBlocking -= toSubtract;
_separated.numThreadsGoal = numThreadsGoal -= toSubtract;
HillClimbing.ThreadPoolHillClimber.ForceChange(
numThreadsGoal,
HillClimbing.StateOrTransition.CooperativeBlocking);
return 0;
}
short configuredMaxThreadsWithoutDelay =
(short)Math.Min((ushort)(_minThreads + BlockingConfig.ThreadsToAddWithoutDelay), (ushort)_maxThreads);
do
{
// Calculate how many threads can be added without a delay. Threads that were already created but may be just
// waiting for work can be released for work without a delay, but creating a new thread may need a delay.
ThreadCounts counts = _separated.counts;
short maxThreadsGoalWithoutDelay =
Math.Max(configuredMaxThreadsWithoutDelay, Math.Min(counts.NumExistingThreads, _maxThreads));
short targetThreadsGoalWithoutDelay = Math.Min(targetThreadsGoal, maxThreadsGoalWithoutDelay);
short newNumThreadsGoal;
if (numThreadsGoal < targetThreadsGoalWithoutDelay)
{
newNumThreadsGoal = targetThreadsGoalWithoutDelay;
}
else if (previousDelayElapsed)
{
newNumThreadsGoal = (short)(numThreadsGoal + 1);
}
else
{
// Need to induce a delay before adding a thread
break;
}
do
{
if (newNumThreadsGoal <= counts.NumExistingThreads)
{
break;
}
//
// Threads would likely need to be created to compensate for blocking, so check memory usage and limits
//
long memoryLimitBytes = _memoryLimitBytes;
if (memoryLimitBytes <= 0)
{
break;
}
// Memory usage is updated after gen 2 GCs, and roughly represents how much physical memory was in use at
// the time of the last gen 2 GC. When new threads are also blocking, they may not have used their typical
// amount of stack space, and gen 2 GCs may not be happening to update the memory usage. Account for a bit
// of extra stack space usage in the future for each thread.
long memoryUsageBytes =
_memoryUsageBytes +
counts.NumExistingThreads * (long)WorkerThread.EstimatedAdditionalStackUsagePerThreadBytes;
// The memory limit may already be less than the total amount of physical memory. We are only accounting for
// thread pool worker threads above, and after fallback starvation may have to continue creating threads
// slowly to prevent a deadlock, so calculate a threshold before falling back by giving the memory limit
// some additional buffer.
long memoryThresholdForFallbackBytes = memoryLimitBytes * 8 / 10;
if (memoryUsageBytes >= memoryThresholdForFallbackBytes)
{
return 0;
}
// Determine how many threads can be added without exceeding the memory threshold
long achievableNumThreadsGoal =
counts.NumExistingThreads +
(memoryThresholdForFallbackBytes - memoryUsageBytes) /
WorkerThread.EstimatedAdditionalStackUsagePerThreadBytes;
newNumThreadsGoal = (short)Math.Min(newNumThreadsGoal, achievableNumThreadsGoal);
if (newNumThreadsGoal <= numThreadsGoal)
{
return 0;
}
} while (false);
_numThreadsAddedDueToBlocking += (short)(newNumThreadsGoal - numThreadsGoal);
_separated.numThreadsGoal = newNumThreadsGoal;
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.CooperativeBlocking);
if (counts.NumProcessingWork >= numThreadsGoal && _separated.numRequestedWorkers > 0)
{
addWorker = true;
}
numThreadsGoal = newNumThreadsGoal;
if (numThreadsGoal >= targetThreadsGoal)
{
return 0;
}
} while (false);
// Calculate how much delay to induce before another thread is created. These operations don't overflow because of
// limits on max thread count and max delays.
_pendingBlockingAdjustment = PendingBlockingAdjustment.WithDelayIfNecessary;
int delayStepCount = 1 + (numThreadsGoal - configuredMaxThreadsWithoutDelay) / BlockingConfig.ThreadsPerDelayStep;
return Math.Min((uint)delayStepCount * BlockingConfig.DelayStepMs, BlockingConfig.MaxDelayMs);
}
private enum PendingBlockingAdjustment : byte
{
None,
Immediately,
WithDelayIfNecessary
}
private static class BlockingConfig
{
public static readonly bool IsCooperativeBlockingEnabled =
AppContextConfigHelper.GetBooleanConfig("System.Threading.ThreadPool.Blocking.CooperativeBlocking", true);
public static readonly short ThreadsToAddWithoutDelay;
public static readonly short ThreadsPerDelayStep;
public static readonly uint DelayStepMs;
public static readonly uint MaxDelayMs;
#pragma warning disable CA1810 // remove the explicit static constructor
static BlockingConfig()
{
// Summary description of how blocking compensation works and how the config settings below are used:
// - After the thread count based on MinThreads is reached, up to ThreadsToAddWithoutDelay additional threads
// may be created without a delay
// - After that, before each additional thread is created, a delay is induced, starting with DelayStepMs
// - For every ThreadsPerDelayStep threads that are added with a delay, an additional DelayStepMs is added to
// the delay
// - The delay may not exceed MaxDelayMs
// - Delays are only induced before creating threads. If threads are already available, they would be released
// without delay to compensate for cooperative blocking.
// - Physical memory usage and limits are also used and beyond a threshold, the system switches to fallback mode
// where threads would be created if starvation is detected, typically with higher delays
// After the thread count based on MinThreads is reached, this value (after it is multiplied by the processor
// count) specifies how many additional threads may be created without a delay
int blocking_threadsToAddWithoutDelay_procCountFactor =
AppContextConfigHelper.GetInt32Config(
"System.Threading.ThreadPool.Blocking.ThreadsToAddWithoutDelay_ProcCountFactor",
1,
false);
// After the thread count based on ThreadsToAddWithoutDelay is reached, this value (after it is multiplied by
// the processor count) specifies after how many threads an additional DelayStepMs would be added to the delay
// before each new thread is created
int blocking_threadsPerDelayStep_procCountFactor =
AppContextConfigHelper.GetInt32Config(
"System.Threading.ThreadPool.Blocking.ThreadsPerDelayStep_ProcCountFactor",
1,
false);
// After the thread count based on ThreadsToAddWithoutDelay is reached, this value specifies how much additional
// delay to add per ThreadsPerDelayStep threads, which would be applied before each new thread is created
DelayStepMs =
(uint)AppContextConfigHelper.GetInt32Config(
"System.Threading.ThreadPool.Blocking.DelayStepMs",
25,
false);
// After the thread count based on ThreadsToAddWithoutDelay is reached, this value specifies the max delay to
// use before each new thread is created
MaxDelayMs =
(uint)AppContextConfigHelper.GetInt32Config(
"System.Threading.ThreadPool.Blocking.MaxDelayMs",
250,
false);
int processorCount = Environment.ProcessorCount;
ThreadsToAddWithoutDelay = (short)(processorCount * blocking_threadsToAddWithoutDelay_procCountFactor);
if (ThreadsToAddWithoutDelay > MaxPossibleThreadCount ||
ThreadsToAddWithoutDelay / processorCount != blocking_threadsToAddWithoutDelay_procCountFactor)
{
ThreadsToAddWithoutDelay = MaxPossibleThreadCount;
}
blocking_threadsPerDelayStep_procCountFactor = Math.Max(1, blocking_threadsPerDelayStep_procCountFactor);
short maxThreadsPerDelayStep = (short)(MaxPossibleThreadCount - ThreadsToAddWithoutDelay);
ThreadsPerDelayStep =
(short)(processorCount * blocking_threadsPerDelayStep_procCountFactor);
if (ThreadsPerDelayStep > maxThreadsPerDelayStep ||
ThreadsPerDelayStep / processorCount != blocking_threadsPerDelayStep_procCountFactor)
{
ThreadsPerDelayStep = maxThreadsPerDelayStep;
}
MaxDelayMs = Math.Max(1, Math.Min(MaxDelayMs, GateThread.GateActivitiesPeriodMs));
DelayStepMs = Math.Max(1, Math.Min(DelayStepMs, MaxDelayMs));
}
#pragma warning restore CA1810
}
}
}
......@@ -12,14 +12,14 @@ internal sealed partial class PortableThreadPool
{
private static class GateThread
{
private const int GateThreadDelayMs = 500;
private const int DequeueDelayThresholdMs = GateThreadDelayMs * 2;
public const uint GateActivitiesPeriodMs = 500;
private const uint DequeueDelayThresholdMs = GateActivitiesPeriodMs * 2;
private const int GateThreadRunningMask = 0x4;
private static readonly AutoResetEvent s_runGateThreadEvent = new AutoResetEvent(initialState: true);
private const int MaxRuns = 2;
private static readonly AutoResetEvent RunGateThreadEvent = new AutoResetEvent(initialState: true);
private static readonly AutoResetEvent DelayEvent = new AutoResetEvent(initialState: false);
private static void GateThreadStart()
{
bool disableStarvationDetection =
......@@ -33,16 +33,68 @@ private static void GateThreadStart()
_ = cpuUtilizationReader.CurrentUtilization;
PortableThreadPool threadPoolInstance = ThreadPoolInstance;
LowLevelLock hillClimbingThreadAdjustmentLock = threadPoolInstance._hillClimbingThreadAdjustmentLock;
LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock;
DelayHelper delayHelper = default;
if (BlockingConfig.IsCooperativeBlockingEnabled)
{
// Initialize memory usage and limits, and register to update them on gen 2 GCs
threadPoolInstance.OnGen2GCCallback();
Gen2GcCallback.Register(threadPoolInstance.OnGen2GCCallback);
}
while (true)
{
s_runGateThreadEvent.WaitOne();
RunGateThreadEvent.WaitOne();
int currentTimeMs = Environment.TickCount;
delayHelper.SetGateActivitiesTime(currentTimeMs);
bool needGateThreadForRuntime;
do
while (true)
{
Thread.Sleep(GateThreadDelayMs);
bool wasSignaledToWake = DelayEvent.WaitOne((int)delayHelper.GetNextDelay(currentTimeMs));
currentTimeMs = Environment.TickCount;
// Thread count adjustment for cooperative blocking
do
{
PendingBlockingAdjustment pendingBlockingAdjustment = threadPoolInstance._pendingBlockingAdjustment;
if (pendingBlockingAdjustment == PendingBlockingAdjustment.None)
{
delayHelper.ClearBlockingAdjustmentDelay();
break;
}
bool previousDelayElapsed = false;
if (delayHelper.HasBlockingAdjustmentDelay)
{
previousDelayElapsed =
delayHelper.HasBlockingAdjustmentDelayElapsed(currentTimeMs, wasSignaledToWake);
if (pendingBlockingAdjustment == PendingBlockingAdjustment.WithDelayIfNecessary &&
!previousDelayElapsed)
{
break;
}
}
uint nextDelayMs = threadPoolInstance.PerformBlockingAdjustment(previousDelayElapsed);
if (nextDelayMs <= 0)
{
delayHelper.ClearBlockingAdjustmentDelay();
}
else
{
delayHelper.SetBlockingAdjustmentTimeAndDelay(currentTimeMs, nextDelayMs);
}
} while (false);
//
// Periodic gate activities
//
if (!delayHelper.ShouldPerformGateActivities(currentTimeMs, wasSignaledToWake))
{
continue;
}
if (ThreadPool.EnableWorkerTracking && NativeRuntimeEventSource.Log.IsEnabled())
{
......@@ -53,17 +105,17 @@ private static void GateThreadStart()
int cpuUtilization = cpuUtilizationReader.CurrentUtilization;
threadPoolInstance._cpuUtilization = cpuUtilization;
needGateThreadForRuntime = ThreadPool.PerformRuntimeSpecificGateActivities(cpuUtilization);
bool needGateThreadForRuntime = ThreadPool.PerformRuntimeSpecificGateActivities(cpuUtilization);
if (!disableStarvationDetection &&
threadPoolInstance._pendingBlockingAdjustment == PendingBlockingAdjustment.None &&
threadPoolInstance._separated.numRequestedWorkers > 0 &&
SufficientDelaySinceLastDequeue(threadPoolInstance))
{
bool addWorker = false;
threadAdjustmentLock.Acquire();
try
{
hillClimbingThreadAdjustmentLock.Acquire();
ThreadCounts counts = threadPoolInstance._separated.counts.VolatileRead();
// Don't add a thread if we're at max or if we are already in the process of adding threads.
// This logic is slightly different from the native implementation in CoreCLR because there are
// no retired threads. In the native implementation, when hill climbing reduces the thread count
......@@ -73,61 +125,67 @@ private static void GateThreadStart()
// stopped from working by hill climbing, so here the number of threads processing work, instead
// of the number of existing threads, is compared with the goal. There may be alternative
// solutions, for now this is only to maintain consistency in behavior.
while (
counts.NumExistingThreads < threadPoolInstance._maxThreads &&
counts.NumProcessingWork >= counts.NumThreadsGoal)
ThreadCounts counts = threadPoolInstance._separated.counts;
if (counts.NumProcessingWork < threadPoolInstance._maxThreads &&
counts.NumProcessingWork >= threadPoolInstance._separated.numThreadsGoal)
{
if (debuggerBreakOnWorkStarvation)
{
Debugger.Break();
}
ThreadCounts newCounts = counts;
short newNumThreadsGoal = (short)(counts.NumProcessingWork + 1);
newCounts.NumThreadsGoal = newNumThreadsGoal;
ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (oldCounts == counts)
{
HillClimbing.ThreadPoolHillClimber.ForceChange(newNumThreadsGoal, HillClimbing.StateOrTransition.Starvation);
WorkerThread.MaybeAddWorkingWorker(threadPoolInstance);
break;
}
counts = oldCounts;
threadPoolInstance._separated.numThreadsGoal = newNumThreadsGoal;
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.Starvation);
addWorker = true;
}
}
finally
{
hillClimbingThreadAdjustmentLock.Release();
threadAdjustmentLock.Release();
}
if (addWorker)
{
WorkerThread.MaybeAddWorkingWorker(threadPoolInstance);
}
}
} while (
needGateThreadForRuntime ||
threadPoolInstance._separated.numRequestedWorkers > 0 ||
Interlocked.Decrement(ref threadPoolInstance._separated.gateThreadRunningState) > GetRunningStateForNumRuns(0));
if (!needGateThreadForRuntime &&
threadPoolInstance._separated.numRequestedWorkers <= 0 &&
threadPoolInstance._pendingBlockingAdjustment == PendingBlockingAdjustment.None &&
Interlocked.Decrement(ref threadPoolInstance._separated.gateThreadRunningState) <= GetRunningStateForNumRuns(0))
{
break;
}
}
}
}
public static void Wake(PortableThreadPool threadPoolInstance)
{
DelayEvent.Set();
EnsureRunning(threadPoolInstance);
}
// called by logic to spawn new worker threads, return true if it's been too long
// since the last dequeue operation - takes number of worker threads into account
// in deciding "too long"
private static bool SufficientDelaySinceLastDequeue(PortableThreadPool threadPoolInstance)
{
int delay = Environment.TickCount - Volatile.Read(ref threadPoolInstance._separated.lastDequeueTime);
int minimumDelay;
uint delay = (uint)(Environment.TickCount - threadPoolInstance._separated.lastDequeueTime);
uint minimumDelay;
if (threadPoolInstance._cpuUtilization < CpuUtilizationLow)
{
minimumDelay = GateThreadDelayMs;
minimumDelay = GateActivitiesPeriodMs;
}
else
{
ThreadCounts counts = threadPoolInstance._separated.counts.VolatileRead();
int numThreads = counts.NumThreadsGoal;
minimumDelay = numThreads * DequeueDelayThresholdMs;
minimumDelay = (uint)threadPoolInstance._separated.numThreadsGoal * DequeueDelayThresholdMs;
}
return delay > minimumDelay;
}
......@@ -148,7 +206,7 @@ internal static void EnsureRunningSlow(PortableThreadPool threadPoolInstance)
int numRunsMask = Interlocked.Exchange(ref threadPoolInstance._separated.gateThreadRunningState, GetRunningStateForNumRuns(MaxRuns));
if (numRunsMask == GetRunningStateForNumRuns(0))
{
s_runGateThreadEvent.Set();
RunGateThreadEvent.Set();
}
else if ((numRunsMask & GateThreadRunningMask) == 0)
{
......@@ -188,6 +246,82 @@ private static void CreateGateThread(PortableThreadPool threadPoolInstance)
}
}
}
private struct DelayHelper
{
private int _previousGateActivitiesTimeMs;
private int _previousBlockingAdjustmentDelayStartTimeMs;
private uint _previousBlockingAdjustmentDelayMs;
private bool _runGateActivitiesAfterNextDelay;
private bool _adjustForBlockingAfterNextDelay;
public void SetGateActivitiesTime(int currentTimeMs)
{
_previousGateActivitiesTimeMs = currentTimeMs;
}
public void SetBlockingAdjustmentTimeAndDelay(int currentTimeMs, uint delayMs)
{
_previousBlockingAdjustmentDelayStartTimeMs = currentTimeMs;
_previousBlockingAdjustmentDelayMs = delayMs;
}
public void ClearBlockingAdjustmentDelay() => _previousBlockingAdjustmentDelayMs = 0;
public bool HasBlockingAdjustmentDelay => _previousBlockingAdjustmentDelayMs != 0;
public uint GetNextDelay(int currentTimeMs)
{
uint elapsedMsSincePreviousGateActivities = (uint)(currentTimeMs - _previousGateActivitiesTimeMs);
uint nextDelayForGateActivities =
elapsedMsSincePreviousGateActivities < GateActivitiesPeriodMs
? GateActivitiesPeriodMs - elapsedMsSincePreviousGateActivities
: 1;
if (_previousBlockingAdjustmentDelayMs == 0)
{
_runGateActivitiesAfterNextDelay = true;
_adjustForBlockingAfterNextDelay = false;
return nextDelayForGateActivities;
}
uint elapsedMsSincePreviousBlockingAdjustmentDelay =
(uint)(currentTimeMs - _previousBlockingAdjustmentDelayStartTimeMs);
uint nextDelayForBlockingAdjustment =
elapsedMsSincePreviousBlockingAdjustmentDelay < _previousBlockingAdjustmentDelayMs
? _previousBlockingAdjustmentDelayMs - elapsedMsSincePreviousBlockingAdjustmentDelay
: 1;
uint nextDelay = Math.Min(nextDelayForGateActivities, nextDelayForBlockingAdjustment);
_runGateActivitiesAfterNextDelay = nextDelay == nextDelayForGateActivities;
_adjustForBlockingAfterNextDelay = nextDelay == nextDelayForBlockingAdjustment;
Debug.Assert(nextDelay <= GateActivitiesPeriodMs);
return nextDelay;
}
public bool ShouldPerformGateActivities(int currentTimeMs, bool wasSignaledToWake)
{
bool result =
(!wasSignaledToWake && _runGateActivitiesAfterNextDelay) ||
(uint)(currentTimeMs - _previousGateActivitiesTimeMs) >= GateActivitiesPeriodMs;
if (result)
{
SetGateActivitiesTime(currentTimeMs);
}
return result;
}
public bool HasBlockingAdjustmentDelayElapsed(int currentTimeMs, bool wasSignaledToWake)
{
Debug.Assert(HasBlockingAdjustmentDelay);
if (!wasSignaledToWake && _adjustForBlockingAfterNextDelay)
{
return true;
}
uint elapsedMsSincePreviousBlockingAdjustmentDelay =
(uint)(currentTimeMs - _previousBlockingAdjustmentDelayStartTimeMs);
return elapsedMsSincePreviousBlockingAdjustmentDelay >= _previousBlockingAdjustmentDelayMs;
}
}
}
internal static void EnsureGateThreadRunning() => GateThread.EnsureRunning(ThreadPoolInstance);
......
......@@ -33,6 +33,7 @@ public enum StateOrTransition
Stabilizing,
Starvation,
ThreadTimedOut,
CooperativeBlocking,
}
// SOS's ThreadPool command depends on the names of all fields
......@@ -321,10 +322,12 @@ public HillClimbing()
newThreadWaveMagnitude = Math.Max(newThreadWaveMagnitude, 1);
//
// Make sure our control setting is within the ThreadPool's limits
// Make sure our control setting is within the ThreadPool's limits. When some threads are blocked due to
// cooperative blocking, ensure that hill climbing does not decrease the thread count below the expected
// minimum.
//
int maxThreads = threadPoolInstance._maxThreads;
int minThreads = threadPoolInstance._minThreads;
int minThreads = threadPoolInstance.MinThreadsGoal;
_currentControlSetting = Math.Min(maxThreads - newThreadWaveMagnitude, _currentControlSetting);
_currentControlSetting = Math.Max(minThreads, _currentControlSetting);
......@@ -355,7 +358,11 @@ public HillClimbing()
// If all of this caused an actual change in thread count, log that as well.
//
if (newThreadCount != currentThreadCount)
{
ChangeThreadCount(newThreadCount, state);
_secondsElapsedSinceLastChange = 0;
_completionsSinceLastChange = 0;
}
//
// Return the new thread count and sample interval. This is randomized to prevent correlations with other periodic
......@@ -377,11 +384,14 @@ public HillClimbing()
private void ChangeThreadCount(int newThreadCount, StateOrTransition state)
{
_lastThreadCount = newThreadCount;
_currentSampleMs = _randomIntervalGenerator.Next(_sampleIntervalMsLow, _sampleIntervalMsHigh + 1);
if (state != StateOrTransition.CooperativeBlocking) // this can be noisy
{
_currentSampleMs = _randomIntervalGenerator.Next(_sampleIntervalMsLow, _sampleIntervalMsHigh + 1);
}
double throughput = _secondsElapsedSinceLastChange > 0 ? _completionsSinceLastChange / _secondsElapsedSinceLastChange : 0;
LogTransition(newThreadCount, throughput, state);
_secondsElapsedSinceLastChange = 0;
_completionsSinceLastChange = 0;
}
private void LogTransition(int newThreadCount, double throughput, StateOrTransition stateOrTransition)
......
......@@ -16,15 +16,14 @@ private struct ThreadCounts
// SOS's ThreadPool command depends on this layout
private const byte NumProcessingWorkShift = 0;
private const byte NumExistingThreadsShift = 16;
private const byte NumThreadsGoalShift = 32;
private ulong _data; // SOS's ThreadPool command depends on this name
private uint _data; // SOS's ThreadPool command depends on this name
private ThreadCounts(ulong data) => _data = data;
private ThreadCounts(uint data) => _data = data;
private short GetInt16Value(byte shift) => (short)(_data >> shift);
private void SetInt16Value(short value, byte shift) =>
_data = (_data & ~((ulong)ushort.MaxValue << shift)) | ((ulong)(ushort)value << shift);
_data = (_data & ~((uint)ushort.MaxValue << shift)) | ((uint)(ushort)value << shift);
/// <summary>
/// Number of threads processing work items.
......@@ -44,7 +43,15 @@ public void SubtractNumProcessingWork(short value)
Debug.Assert(value >= 0);
Debug.Assert(value <= NumProcessingWork);
_data -= (ulong)(ushort)value << NumProcessingWorkShift;
_data -= (uint)(ushort)value << NumProcessingWorkShift;
}
public void InterlockedDecrementNumProcessingWork()
{
Debug.Assert(NumProcessingWorkShift == 0);
ThreadCounts counts = new ThreadCounts(Interlocked.Decrement(ref _data));
Debug.Assert(counts.NumProcessingWork >= 0);
}
/// <summary>
......@@ -65,20 +72,7 @@ public void SubtractNumExistingThreads(short value)
Debug.Assert(value >= 0);
Debug.Assert(value <= NumExistingThreads);
_data -= (ulong)(ushort)value << NumExistingThreadsShift;
}
/// <summary>
/// Max possible thread pool threads we want to have.
/// </summary>
public short NumThreadsGoal
{
get => GetInt16Value(NumThreadsGoalShift);
set
{
Debug.Assert(value > 0);
SetInt16Value(value, NumThreadsGoalShift);
}
_data -= (uint)(ushort)value << NumExistingThreadsShift;
}
public ThreadCounts VolatileRead() => new ThreadCounts(Volatile.Read(ref _data));
......@@ -90,7 +84,7 @@ public short NumThreadsGoal
public static bool operator !=(ThreadCounts lhs, ThreadCounts rhs) => lhs._data != rhs._data;
public override bool Equals([NotNullWhen(true)] object? obj) => obj is ThreadCounts other && _data == other._data;
public override int GetHashCode() => (int)_data + (int)(_data >> 32);
public override int GetHashCode() => (int)_data;
}
}
}
......@@ -12,6 +12,11 @@ internal sealed partial class PortableThreadPool
/// </summary>
private static class WorkerThread
{
// This value represents an assumption of how much uncommited stack space a worker thread may use in the future.
// Used in calculations to estimate when to throttle the rate of thread injection to reduce the possibility of
// preexisting threads from running out of memory when using new stack space in low-memory situations.
public const int EstimatedAdditionalStackUsagePerThreadBytes = 64 << 10; // 64 KB
/// <summary>
/// Semaphore for controlling how many threads are currently working.
/// </summary>
......@@ -43,7 +48,7 @@ private static void WorkerThreadStart()
(uint)threadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
}
LowLevelLock hillClimbingThreadAdjustmentLock = threadPoolInstance._hillClimbingThreadAdjustmentLock;
LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock;
LowLevelLifoSemaphore semaphore = s_semaphore;
while (true)
......@@ -54,7 +59,7 @@ private static void WorkerThreadStart()
bool alreadyRemovedWorkingWorker = false;
while (TakeActiveRequest(threadPoolInstance))
{
Volatile.Write(ref threadPoolInstance._separated.lastDequeueTime, Environment.TickCount);
threadPoolInstance._separated.lastDequeueTime = Environment.TickCount;
if (!ThreadPoolWorkQueue.Dispatch())
{
// ShouldStopProcessingWorkNow() caused the thread to stop processing work, and it would have
......@@ -96,14 +101,14 @@ private static void WorkerThreadStart()
}
}
hillClimbingThreadAdjustmentLock.Acquire();
threadAdjustmentLock.Acquire();
try
{
// At this point, the thread's wait timed out. We are shutting down this thread.
// We are going to decrement the number of exisiting threads to no longer include this one
// and then change the max number of threads in the thread pool to reflect that we don't need as many
// as we had. Finally, we are going to tell hill climbing that we changed the max number of threads.
ThreadCounts counts = threadPoolInstance._separated.counts.VolatileRead();
ThreadCounts counts = threadPoolInstance._separated.counts;
while (true)
{
// Since this thread is currently registered as an existing thread, if more work comes in meanwhile,
......@@ -119,13 +124,21 @@ private static void WorkerThreadStart()
ThreadCounts newCounts = counts;
newCounts.SubtractNumExistingThreads(1);
short newNumExistingThreads = (short)(numExistingThreads - 1);
short newNumThreadsGoal = Math.Max(threadPoolInstance._minThreads, Math.Min(newNumExistingThreads, newCounts.NumThreadsGoal));
newCounts.NumThreadsGoal = newNumThreadsGoal;
ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (oldCounts == counts)
{
HillClimbing.ThreadPoolHillClimber.ForceChange(newNumThreadsGoal, HillClimbing.StateOrTransition.ThreadTimedOut);
short newNumThreadsGoal =
Math.Max(
threadPoolInstance.MinThreadsGoal,
Math.Min(newNumExistingThreads, threadPoolInstance._separated.numThreadsGoal));
if (threadPoolInstance._separated.numThreadsGoal != newNumThreadsGoal)
{
threadPoolInstance._separated.numThreadsGoal = newNumThreadsGoal;
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.ThreadTimedOut);
}
if (NativeRuntimeEventSource.Log.IsEnabled())
{
......@@ -139,7 +152,7 @@ private static void WorkerThreadStart()
}
finally
{
hillClimbingThreadAdjustmentLock.Release();
threadAdjustmentLock.Release();
}
}
}
......@@ -149,19 +162,7 @@ private static void WorkerThreadStart()
/// </summary>
private static void RemoveWorkingWorker(PortableThreadPool threadPoolInstance)
{
ThreadCounts currentCounts = threadPoolInstance._separated.counts.VolatileRead();
while (true)
{
ThreadCounts newCounts = currentCounts;
newCounts.SubtractNumProcessingWork(1);
ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, currentCounts);
if (oldCounts == currentCounts)
{
break;
}
currentCounts = oldCounts;
}
threadPoolInstance._separated.counts.InterlockedDecrementNumProcessingWork();
// It's possible that we decided we had thread requests just before a request came in,
// but reduced the worker count *after* the request came in. In this case, we might
......@@ -175,12 +176,12 @@ private static void RemoveWorkingWorker(PortableThreadPool threadPoolInstance)
internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance)
{
ThreadCounts counts = threadPoolInstance._separated.counts.VolatileRead();
ThreadCounts counts = threadPoolInstance._separated.counts;
short numExistingThreads, numProcessingWork, newNumExistingThreads, newNumProcessingWork;
while (true)
{
numProcessingWork = counts.NumProcessingWork;
if (numProcessingWork >= counts.NumThreadsGoal)
if (numProcessingWork >= threadPoolInstance._separated.numThreadsGoal)
{
return;
}
......@@ -219,7 +220,7 @@ internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance
continue;
}
counts = threadPoolInstance._separated.counts.VolatileRead();
counts = threadPoolInstance._separated.counts;
while (true)
{
ThreadCounts newCounts = counts;
......@@ -245,17 +246,17 @@ internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance
/// <returns>Whether or not this thread should stop processing work even if there is still work in the queue.</returns>
internal static bool ShouldStopProcessingWorkNow(PortableThreadPool threadPoolInstance)
{
ThreadCounts counts = threadPoolInstance._separated.counts.VolatileRead();
ThreadCounts counts = threadPoolInstance._separated.counts;
while (true)
{
// When there are more threads processing work than the thread count goal, hill climbing must have decided
// When there are more threads processing work than the thread count goal, it may have been decided
// to decrease the number of threads. Stop processing if the counts can be updated. We may have more
// threads existing than the thread count goal and that is ok, the cold ones will eventually time out if
// the thread count goal is not increased again. This logic is a bit different from the original CoreCLR
// code from which this implementation was ported, which turns a processing thread into a retired thread
// and checks for pending requests like RemoveWorkingWorker. In this implementation there are
// no retired threads, so only the count of threads processing work is considered.
if (counts.NumProcessingWork <= counts.NumThreadsGoal)
if (counts.NumProcessingWork <= threadPoolInstance._separated.numThreadsGoal)
{
return false;
}
......
......@@ -28,8 +28,10 @@ internal sealed partial class PortableThreadPool
private const int CpuUtilizationHigh = 95;
private const int CpuUtilizationLow = 80;
private static readonly short s_forcedMinWorkerThreads = AppContextConfigHelper.GetInt16Config("System.Threading.ThreadPool.MinThreads", 0, false);
private static readonly short s_forcedMaxWorkerThreads = AppContextConfigHelper.GetInt16Config("System.Threading.ThreadPool.MaxThreads", 0, false);
private static readonly short ForcedMinWorkerThreads =
AppContextConfigHelper.GetInt16Config("System.Threading.ThreadPool.MinThreads", 0, false);
private static readonly short ForcedMaxWorkerThreads =
AppContextConfigHelper.GetInt16Config("System.Threading.ThreadPool.MaxThreads", 0, false);
[ThreadStatic]
private static object? t_completionCountObject;
......@@ -43,43 +45,55 @@ internal sealed partial class PortableThreadPool
private int _cpuUtilization; // SOS's ThreadPool command depends on this name
private short _minThreads;
private short _maxThreads;
private readonly LowLevelLock _maxMinThreadLock = new LowLevelLock();
[StructLayout(LayoutKind.Explicit, Size = Internal.PaddingHelpers.CACHE_LINE_SIZE * 6)]
private struct CacheLineSeparated
{
[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 1)]
public ThreadCounts counts; // SOS's ThreadPool command depends on this name
[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 1 + sizeof(uint))]
public short numThreadsGoal;
[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 2)]
public int lastDequeueTime;
[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 3)]
public int priorCompletionCount;
[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 3 + sizeof(int))]
public int priorCompletedWorkRequestsTime;
[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 3 + sizeof(int) * 2)]
public int nextCompletedWorkRequestsTime;
[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 4)]
public volatile int numRequestedWorkers;
[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 5)]
[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 4 + sizeof(int))]
public int gateThreadRunningState;
}
private CacheLineSeparated _separated; // SOS's ThreadPool command depends on this name
private long _currentSampleStartTime;
private readonly ThreadInt64PersistentCounter _completionCounter = new ThreadInt64PersistentCounter();
private int _threadAdjustmentIntervalMs;
private readonly LowLevelLock _hillClimbingThreadAdjustmentLock = new LowLevelLock();
private short _numBlockedThreads;
private short _numThreadsAddedDueToBlocking;
private PendingBlockingAdjustment _pendingBlockingAdjustment;
private long _memoryUsageBytes;
private long _memoryLimitBytes;
private readonly LowLevelLock _threadAdjustmentLock = new LowLevelLock();
private CacheLineSeparated _separated; // SOS's ThreadPool command depends on this name
private PortableThreadPool()
{
_minThreads = s_forcedMinWorkerThreads > 0 ? s_forcedMinWorkerThreads : (short)Environment.ProcessorCount;
_minThreads = ForcedMinWorkerThreads > 0 ? ForcedMinWorkerThreads : (short)Environment.ProcessorCount;
if (_minThreads > MaxPossibleThreadCount)
{
_minThreads = MaxPossibleThreadCount;
}
_maxThreads = s_forcedMaxWorkerThreads > 0 ? s_forcedMaxWorkerThreads : DefaultMaxWorkerThreadCount;
_maxThreads = ForcedMaxWorkerThreads > 0 ? ForcedMaxWorkerThreads : DefaultMaxWorkerThreadCount;
if (_maxThreads > MaxPossibleThreadCount)
{
_maxThreads = MaxPossibleThreadCount;
......@@ -89,13 +103,7 @@ private PortableThreadPool()
_maxThreads = _minThreads;
}
_separated = new CacheLineSeparated
{
counts = new ThreadCounts
{
NumThreadsGoal = _minThreads
}
};
_separated.numThreadsGoal = _minThreads;
}
public bool SetMinThreads(int workerThreads, int ioCompletionThreads)
......@@ -105,7 +113,10 @@ public bool SetMinThreads(int workerThreads, int ioCompletionThreads)
return false;
}
_maxMinThreadLock.Acquire();
bool addWorker = false;
bool wakeGateThread = false;
_threadAdjustmentLock.Acquire();
try
{
if (workerThreads > _maxThreads || !ThreadPool.CanSetMinIOCompletionThreads(ioCompletionThreads))
......@@ -115,39 +126,45 @@ public bool SetMinThreads(int workerThreads, int ioCompletionThreads)
ThreadPool.SetMinIOCompletionThreads(ioCompletionThreads);
if (s_forcedMinWorkerThreads != 0)
if (ForcedMinWorkerThreads != 0)
{
return true;
}
short newMinThreads = (short)Math.Max(1, Math.Min(workerThreads, MaxPossibleThreadCount));
_minThreads = newMinThreads;
ThreadCounts counts = _separated.counts.VolatileRead();
while (counts.NumThreadsGoal < newMinThreads)
if (_numBlockedThreads > 0)
{
ThreadCounts newCounts = counts;
newCounts.NumThreadsGoal = newMinThreads;
ThreadCounts oldCounts = _separated.counts.InterlockedCompareExchange(newCounts, counts);
if (oldCounts == counts)
// Blocking adjustment will adjust the goal according to its heuristics
if (_pendingBlockingAdjustment != PendingBlockingAdjustment.Immediately)
{
if (_separated.numRequestedWorkers > 0)
{
WorkerThread.MaybeAddWorkingWorker(this);
}
break;
_pendingBlockingAdjustment = PendingBlockingAdjustment.Immediately;
wakeGateThread = true;
}
}
else if (_separated.numThreadsGoal < newMinThreads)
{
_separated.numThreadsGoal = newMinThreads;
if (_separated.numRequestedWorkers > 0)
{
addWorker = true;
}
counts = oldCounts;
}
return true;
}
finally
{
_maxMinThreadLock.Release();
_threadAdjustmentLock.Release();
}
if (addWorker)
{
WorkerThread.MaybeAddWorkingWorker(this);
}
else if (wakeGateThread)
{
GateThread.Wake(this);
}
return true;
}
public int GetMinThreads() => Volatile.Read(ref _minThreads);
......@@ -159,7 +176,7 @@ public bool SetMaxThreads(int workerThreads, int ioCompletionThreads)
return false;
}
_maxMinThreadLock.Acquire();
_threadAdjustmentLock.Acquire();
try
{
if (workerThreads < _minThreads || !ThreadPool.CanSetMaxIOCompletionThreads(ioCompletionThreads))
......@@ -169,34 +186,22 @@ public bool SetMaxThreads(int workerThreads, int ioCompletionThreads)
ThreadPool.SetMaxIOCompletionThreads(ioCompletionThreads);
if (s_forcedMaxWorkerThreads != 0)
if (ForcedMaxWorkerThreads != 0)
{
return true;
}
short newMaxThreads = (short)Math.Min(workerThreads, MaxPossibleThreadCount);
_maxThreads = newMaxThreads;
ThreadCounts counts = _separated.counts.VolatileRead();
while (counts.NumThreadsGoal > newMaxThreads)
if (_separated.numThreadsGoal > newMaxThreads)
{
ThreadCounts newCounts = counts;
newCounts.NumThreadsGoal = newMaxThreads;
ThreadCounts oldCounts = _separated.counts.InterlockedCompareExchange(newCounts, counts);
if (oldCounts == counts)
{
break;
}
counts = oldCounts;
_separated.numThreadsGoal = newMaxThreads;
}
return true;
}
finally
{
_maxMinThreadLock.Release();
_threadAdjustmentLock.Release();
}
}
......@@ -232,7 +237,7 @@ private object CreateThreadLocalCompletionCountObject()
private void NotifyWorkItemProgress(object threadLocalCompletionCountObject, int currentTimeMs)
{
ThreadInt64PersistentCounter.Increment(threadLocalCompletionCountObject);
Volatile.Write(ref _separated.lastDequeueTime, Environment.TickCount);
_separated.lastDequeueTime = currentTimeMs;
if (ShouldAdjustMaxWorkersActive(currentTimeMs))
{
......@@ -257,15 +262,23 @@ internal bool NotifyWorkItemComplete(object? threadLocalCompletionCountObject, i
//
private void AdjustMaxWorkersActive()
{
LowLevelLock hillClimbingThreadAdjustmentLock = _hillClimbingThreadAdjustmentLock;
if (!hillClimbingThreadAdjustmentLock.TryAcquire())
LowLevelLock threadAdjustmentLock = _threadAdjustmentLock;
if (!threadAdjustmentLock.TryAcquire())
{
// The lock is held by someone else, they will take care of this for us
return;
}
bool addWorker = false;
try
{
// Skip hill climbing when there is a pending blocking adjustment. Hill climbing may otherwise bypass the
// blocking adjustment heuristics and increase the thread count too quickly.
if (_pendingBlockingAdjustment != PendingBlockingAdjustment.None)
{
return;
}
long startTime = _currentSampleStartTime;
long endTime = Stopwatch.GetTimestamp();
long freq = Stopwatch.Frequency;
......@@ -278,39 +291,24 @@ private void AdjustMaxWorkersActive()
int totalNumCompletions = (int)_completionCounter.Count;
int numCompletions = totalNumCompletions - _separated.priorCompletionCount;
ThreadCounts currentCounts = _separated.counts.VolatileRead();
int newMax;
(newMax, _threadAdjustmentIntervalMs) = HillClimbing.ThreadPoolHillClimber.Update(currentCounts.NumThreadsGoal, elapsedSeconds, numCompletions);
while (newMax != currentCounts.NumThreadsGoal)
int newNumThreadsGoal;
(newNumThreadsGoal, _threadAdjustmentIntervalMs) =
HillClimbing.ThreadPoolHillClimber.Update(_separated.numThreadsGoal, elapsedSeconds, numCompletions);
short oldNumThreadsGoal = _separated.numThreadsGoal;
if (oldNumThreadsGoal != (short)newNumThreadsGoal)
{
ThreadCounts newCounts = currentCounts;
newCounts.NumThreadsGoal = (short)newMax;
ThreadCounts oldCounts = _separated.counts.InterlockedCompareExchange(newCounts, currentCounts);
if (oldCounts == currentCounts)
{
//
// If we're increasing the max, inject a thread. If that thread finds work, it will inject
// another thread, etc., until nobody finds work or we reach the new maximum.
//
// If we're reducing the max, whichever threads notice this first will sleep and timeout themselves.
//
if (newMax > oldCounts.NumThreadsGoal)
{
WorkerThread.MaybeAddWorkingWorker(this);
}
break;
}
if (oldCounts.NumThreadsGoal > currentCounts.NumThreadsGoal && oldCounts.NumThreadsGoal >= newMax)
_separated.numThreadsGoal = (short)newNumThreadsGoal;
//
// If we're increasing the goal, inject a thread. If that thread finds work, it will inject
// another thread, etc., until nobody finds work or we reach the new goal.
//
// If we're reducing the goal, whichever threads notice this first will sleep and timeout themselves.
//
if (newNumThreadsGoal > oldNumThreadsGoal)
{
// someone (probably the gate thread) increased the thread count more than
// we are about to do. Don't interfere.
break;
addWorker = true;
}
currentCounts = oldCounts;
}
_separated.priorCompletionCount = totalNumCompletions;
......@@ -321,12 +319,22 @@ private void AdjustMaxWorkersActive()
}
finally
{
hillClimbingThreadAdjustmentLock.Release();
threadAdjustmentLock.Release();
}
if (addWorker)
{
WorkerThread.MaybeAddWorkingWorker(this);
}
}
private bool ShouldAdjustMaxWorkersActive(int currentTimeMs)
{
if (HillClimbing.IsDisabled)
{
return false;
}
// We need to subtract by prior time because Environment.TickCount can wrap around, making a comparison of absolute
// times unreliable. Intervals are unsigned to avoid wrapping around on the subtract after enough time elapses, and
// this also prevents the initial elapsed interval from being negative due to the prior and next times being
......@@ -334,19 +342,26 @@ private bool ShouldAdjustMaxWorkersActive(int currentTimeMs)
int priorTime = Volatile.Read(ref _separated.priorCompletedWorkRequestsTime);
uint requiredInterval = (uint)(_separated.nextCompletedWorkRequestsTime - priorTime);
uint elapsedInterval = (uint)(currentTimeMs - priorTime);
if (elapsedInterval >= requiredInterval)
if (elapsedInterval < requiredInterval)
{
return false;
}
// Avoid trying to adjust the thread count goal if there are already more threads than the thread count goal.
// In that situation, hill climbing must have previously decided to decrease the thread count goal, so let's
// wait until the system responds to that change before calling into hill climbing again. This condition should
// be the opposite of the condition in WorkerThread.ShouldStopProcessingWorkNow that causes
// threads processing work to stop in response to a decreased thread count goal. The logic here is a bit
// different from the original CoreCLR code from which this implementation was ported because in this
// implementation there are no retired threads, so only the count of threads processing work is considered.
if (_separated.counts.NumProcessingWork > _separated.numThreadsGoal)
{
// Avoid trying to adjust the thread count goal if there are already more threads than the thread count goal.
// In that situation, hill climbing must have previously decided to decrease the thread count goal, so let's
// wait until the system responds to that change before calling into hill climbing again. This condition should
// be the opposite of the condition in WorkerThread.ShouldStopProcessingWorkNow that causes
// threads processing work to stop in response to a decreased thread count goal. The logic here is a bit
// different from the original CoreCLR code from which this implementation was ported because in this
// implementation there are no retired threads, so only the count of threads processing work is considered.
ThreadCounts counts = _separated.counts.VolatileRead();
return counts.NumProcessingWork <= counts.NumThreadsGoal && !HillClimbing.IsDisabled;
return false;
}
return false;
// Skip hill climbing when there is a pending blocking adjustment. Hill climbing may otherwise bypass the
// blocking adjustment heuristics and increase the thread count too quickly.
return _pendingBlockingAdjustment == PendingBlockingAdjustment.None;
}
internal void RequestWorker()
......@@ -357,5 +372,16 @@ internal void RequestWorker()
WorkerThread.MaybeAddWorkingWorker(this);
GateThread.EnsureRunning(this);
}
private bool OnGen2GCCallback()
{
// Gen 2 GCs may be very infrequent in some cases. If it becomes an issue, consider updating the memory usage more
// frequently. The memory usage is only used for fallback purposes in blocking adjustment, so an artifically higher
// memory usage may cause blocking adjustment to fall back to slower adjustments sooner than necessary.
GCMemoryInfo gcMemoryInfo = GC.GetGCMemoryInfo();
_memoryLimitBytes = gcMemoryInfo.HighMemoryLoadThresholdBytes;
_memoryUsageBytes = Math.Min(gcMemoryInfo.MemoryLoadBytes, gcMemoryInfo.HighMemoryLoadThresholdBytes);
return true; // continue receiving gen 2 GC callbacks
}
}
}
......@@ -2973,14 +2973,36 @@ private bool SpinThenBlockingWait(int millisecondsTimeout, CancellationToken can
#pragma warning disable CA1416 // Validate platform compatibility, issue: https://github.com/dotnet/runtime/issues/44622
if (infiniteWait)
{
returnValue = mres.Wait(Timeout.Infinite, cancellationToken);
bool notifyWhenUnblocked = ThreadPool.NotifyThreadBlocked();
try
{
returnValue = mres.Wait(Timeout.Infinite, cancellationToken);
}
finally
{
if (notifyWhenUnblocked)
{
ThreadPool.NotifyThreadUnblocked();
}
}
}
else
{
uint elapsedTimeTicks = ((uint)Environment.TickCount) - startTimeTicks;
if (elapsedTimeTicks < millisecondsTimeout)
{
returnValue = mres.Wait((int)(millisecondsTimeout - elapsedTimeTicks), cancellationToken);
bool notifyWhenUnblocked = ThreadPool.NotifyThreadBlocked();
try
{
returnValue = mres.Wait((int)(millisecondsTimeout - elapsedTimeTicks), cancellationToken);
}
finally
{
if (notifyWhenUnblocked)
{
ThreadPool.NotifyThreadUnblocked();
}
}
}
}
#pragma warning restore CA1416
......
......@@ -100,6 +100,9 @@ public static void GetAvailableThreads(out int workerThreads, out int completion
internal static bool NotifyWorkItemComplete(object? threadLocalCompletionCountObject, int currentTimeMs) =>
PortableThreadPool.ThreadPoolInstance.NotifyWorkItemComplete(threadLocalCompletionCountObject, currentTimeMs);
internal static bool NotifyThreadBlocked() => PortableThreadPool.ThreadPoolInstance.NotifyThreadBlocked();
internal static void NotifyThreadUnblocked() => PortableThreadPool.ThreadPoolInstance.NotifyThreadUnblocked();
internal static object GetOrCreateThreadLocalCompletionCountObject() =>
PortableThreadPool.ThreadPoolInstance.GetOrCreateThreadLocalCompletionCountObject();
......
......@@ -725,7 +725,12 @@ internal static bool Dispatch()
//
int currentTickCount = Environment.TickCount;
if (!ThreadPool.NotifyWorkItemComplete(threadLocalCompletionCountObject, currentTickCount))
{
// This thread is being parked and may remain inactive for a while. Transfer any thread-local work items
// to ensure that they would not be heavily delayed.
tl.TransferLocalWork();
return false;
}
// Check if the dispatch quantum has expired
if ((uint)(currentTickCount - startTickCount) < DispatchQuantumMs)
......@@ -824,21 +829,20 @@ public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq)
threadLocalCompletionCountObject = ThreadPool.GetOrCreateThreadLocalCompletionCountObject();
}
public void TransferLocalWork()
{
while (workStealingQueue.LocalPop() is object cb)
{
workQueue.Enqueue(cb, forceGlobal: true);
}
}
~ThreadPoolWorkQueueThreadLocals()
{
// Transfer any pending workitems into the global queue so that they will be executed by another thread
if (null != workStealingQueue)
{
if (null != workQueue)
{
object? cb;
while ((cb = workStealingQueue.LocalPop()) != null)
{
Debug.Assert(null != cb);
workQueue.Enqueue(cb, forceGlobal: true);
}
}
TransferLocalWork();
ThreadPoolWorkQueue.WorkStealingQueueList.Remove(workStealingQueue);
}
}
......
......@@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using System.Threading.Tests;
using Microsoft.DotNet.RemoteExecutor;
......@@ -885,6 +886,69 @@ public static void ThreadPoolThreadCreationDoesNotTransferExecutionContext()
}).Dispose();
}
[ConditionalFact(nameof(IsThreadingAndRemoteExecutorSupported))]
public static void CooperativeBlockingCanCreateThreadsFaster()
{
// Run in a separate process to test in a clean thread pool environment such that work items queued by the test
// would cause the thread pool to create threads
RemoteExecutor.Invoke(() =>
{
// All but the last of these work items will block and the last queued work item would release the blocking.
// Without cooperative blocking, this would lead to starvation after <proc count> work items run. Since
// starvation adds threads at a rate of at most 2 per second, the extra 120 work items would take roughly 60
// seconds to get unblocked and since the test waits for 30 seconds it would time out. Cooperative blocking is
// configured below to increase the rate of thread injection for testing purposes while getting a decent amount
// of coverage for its behavior. With cooperative blocking as configured below, the test should finish within a
// few seconds.
int processorCount = Environment.ProcessorCount;
int workItemCount = processorCount + 120;
SetBlockingConfigValue("ThreadsToAddWithoutDelay_ProcCountFactor", 1);
SetBlockingConfigValue("MaxDelayMs", 1);
var allWorkItemsUnblocked = new AutoResetEvent(false);
// Run a second iteration for some extra coverage. Iterations after the first one would be much faster because
// the necessary number of threads would already have been created by then, and would not add much to the test
// time.
for (int iterationIndex = 0; iterationIndex < 2; ++iterationIndex)
{
var tcs = new TaskCompletionSource<int>();
int unblockedThreadCount = 0;
Action<int> blockingWorkItem = _ =>
{
tcs.Task.Wait();
if (Interlocked.Increment(ref unblockedThreadCount) == workItemCount - 1)
{
allWorkItemsUnblocked.Set();
}
};
for (int i = 0; i < workItemCount - 1; ++i)
{
ThreadPool.UnsafeQueueUserWorkItem(blockingWorkItem, 0, preferLocal: false);
}
Action<int> unblockingWorkItem = _ => tcs.SetResult(0);
ThreadPool.UnsafeQueueUserWorkItem(unblockingWorkItem, 0, preferLocal: false);
Assert.True(allWorkItemsUnblocked.WaitOne(30_000));
}
void SetBlockingConfigValue(string name, int value) =>
AppContextSetData("System.Threading.ThreadPool.Blocking." + name, value);
void AppContextSetData(string name, object value)
{
typeof(AppContext).InvokeMember(
"SetData",
BindingFlags.ExactBinding | BindingFlags.InvokeMethod | BindingFlags.Public | BindingFlags.Static,
null,
null,
new object[] { name, value });
}
}).Dispose();
}
public static bool IsThreadingAndRemoteExecutorSupported =>
PlatformDetection.IsThreadingSupported && RemoteExecutor.IsSupported;
}
......
......@@ -90,6 +90,12 @@ internal static bool NotifyWorkItemComplete(object? threadLocalCompletionCountOb
return true;
}
internal static bool NotifyThreadBlocked() => false;
internal static void NotifyThreadUnblocked()
{
}
internal static object? GetOrCreateThreadLocalCompletionCountObject() => null;
private static RegisteredWaitHandle RegisterWaitForSingleObject(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册