未验证 提交 117d31a6 编写于 作者: B Brennan 提交者: GitHub

Cleanup idle rate limiters (#69677)

上级 41c6772c
......@@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
......@@ -33,6 +34,7 @@ public static class PartitionedRateLimiter
internal sealed class DefaultPartitionedRateLimiter<TResource, TKey> : PartitionedRateLimiter<TResource> where TKey : notnull
{
private readonly Func<TResource, RateLimitPartition<TKey>> _partitioner;
private static TimeSpan _idleTimeLimit = TimeSpan.FromSeconds(10);
// TODO: Look at ConcurrentDictionary to try and avoid a global lock
private Dictionary<TKey, Lazy<RateLimiter>> _limiters;
......@@ -42,8 +44,9 @@ internal sealed class DefaultPartitionedRateLimiter<TResource, TKey> : Partition
// Used by the Timer to call TryRelenish on ReplenishingRateLimiters
// We use a separate list to avoid running TryReplenish (which might be user code) inside our lock
// And we cache the list to amortize the allocation cost to as close to 0 as we can get
private List<Lazy<RateLimiter>> _cachedLimiters = new();
private List<KeyValuePair<TKey, Lazy<RateLimiter>>> _cachedLimiters = new();
private bool _cacheInvalid;
private List<RateLimiter> _limitersToDispose = new();
private TimerAwaitable _timer;
private Task _timerTask;
......@@ -68,7 +71,7 @@ private async Task RunTimer()
{
try
{
Replenish(this);
await Heartbeat().ConfigureAwait(false);
}
// TODO: Can we log to EventSource or somewhere? Maybe dispatch throwing the exception so it is at least an unhandled exception?
catch { }
......@@ -128,14 +131,29 @@ protected override void Dispose(bool disposing)
return;
}
List<Exception>? exceptions = null;
// Safe to access _limiters outside the lock
// The timer is no longer running and _disposed is set so anyone trying to access fields will be checking that first
foreach (KeyValuePair<TKey, Lazy<RateLimiter>> limiter in _limiters)
{
limiter.Value.Value.Dispose();
try
{
limiter.Value.Value.Dispose();
}
catch (Exception ex)
{
exceptions ??= new List<Exception>();
exceptions.Add(ex);
}
}
_limiters.Clear();
_disposeComplete.TrySetResult(null);
if (exceptions is not null)
{
throw new AggregateException(exceptions);
}
}
protected override async ValueTask DisposeAsyncCore()
......@@ -151,12 +169,26 @@ protected override async ValueTask DisposeAsyncCore()
return;
}
List<Exception>? exceptions = null;
foreach (KeyValuePair<TKey, Lazy<RateLimiter>> limiter in _limiters)
{
await limiter.Value.Value.DisposeAsync().ConfigureAwait(false);
try
{
await limiter.Value.Value.DisposeAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
exceptions ??= new List<Exception>();
exceptions.Add(ex);
}
}
_limiters.Clear();
_disposeComplete.TrySetResult(null);
if (exceptions is not null)
{
throw new AggregateException(exceptions);
}
}
// This handles the common state changes that Dispose and DisposeAsync need to do, the individual limiters still need to be Disposed after this call
......@@ -182,48 +214,82 @@ private void ThrowIfDisposed()
}
}
private static void Replenish(DefaultPartitionedRateLimiter<TResource, TKey> limiter)
private async Task Heartbeat()
{
lock (limiter.Lock)
lock (Lock)
{
if (limiter._disposed)
if (_disposed)
{
return;
}
// If the cache has been invalidated we need to recreate it
if (limiter._cacheInvalid)
if (_cacheInvalid)
{
_cachedLimiters.Clear();
_cachedLimiters.AddRange(_limiters);
}
}
List<Exception>? aggregateExceptions = null;
// cachedLimiters is safe to use outside the lock because it is only updated by the Timer
foreach (KeyValuePair<TKey, Lazy<RateLimiter>> rateLimiter in _cachedLimiters)
{
if (!rateLimiter.Value.IsValueCreated)
{
limiter._cachedLimiters.Clear();
bool cacheStillInvalid = false;
foreach (KeyValuePair<TKey, Lazy<RateLimiter>> kvp in limiter._limiters)
continue;
}
if (rateLimiter.Value.Value.IdleDuration is TimeSpan idleDuration && idleDuration > _idleTimeLimit)
{
lock (Lock)
{
if (kvp.Value.IsValueCreated)
{
if (kvp.Value.Value is ReplenishingRateLimiter)
{
limiter._cachedLimiters.Add(kvp.Value);
}
}
else
// Check time again under lock to make sure no one calls Acquire or WaitAsync after checking the time and removing the limiter
idleDuration = rateLimiter.Value.Value.IdleDuration ?? TimeSpan.Zero;
if (idleDuration > _idleTimeLimit)
{
// In rare cases the RateLimiter will be added to the storage but not be initialized yet
// keep cache invalid if there was a non-initialized RateLimiter
// the next time we run the timer the cache will be updated
// with the initialized RateLimiter
cacheStillInvalid = true;
// Remove limiter from the lookup table and mark cache as invalid
// If a request for this partition comes in it will have to create a new limiter now
// And the next time the timer runs the cache needs to be updated to no longer have a reference to this limiter
_cacheInvalid = true;
_limiters.Remove(rateLimiter.Key);
// We don't want to dispose inside the lock so we need to defer it
_limitersToDispose.Add(rateLimiter.Value.Value);
}
}
limiter._cacheInvalid = cacheStillInvalid;
}
else if (rateLimiter.Value.Value is ReplenishingRateLimiter replenishingRateLimiter)
{
try
{
replenishingRateLimiter.TryReplenish();
}
catch (Exception ex)
{
aggregateExceptions ??= new List<Exception>();
aggregateExceptions.Add(ex);
}
}
}
// cachedLimiters is safe to use outside the lock because it is only updated by the Timer
// and the Timer avoids re-entrancy issues via the _executingTimer field
foreach (Lazy<RateLimiter> rateLimiter in limiter._cachedLimiters)
foreach (RateLimiter limiter in _limitersToDispose)
{
try
{
await limiter.DisposeAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
aggregateExceptions ??= new List<Exception>();
aggregateExceptions.Add(ex);
}
}
_limitersToDispose.Clear();
if (aggregateExceptions is not null)
{
Debug.Assert(rateLimiter.IsValueCreated && rateLimiter.Value is ReplenishingRateLimiter);
((ReplenishingRateLimiter)rateLimiter.Value).TryReplenish();
throw new AggregateException(aggregateExceptions);
}
}
}
......
......@@ -32,8 +32,6 @@ public async Task WaitAsyncThrowsWhenPassedACanceledToken()
async () => await limiter.WaitAsync(string.Empty, 1, new CancellationToken(true)));
}
// Create
[Fact]
public void Create_AcquireCallsUnderlyingPartitionsLimiter()
{
......@@ -276,6 +274,29 @@ public void Create_DisposeDisposesAllLimiters()
Assert.Equal(1, limiterFactory.Limiters[1].Limiter.DisposeCallCount);
}
[Fact]
public void Create_DisposeWithThrowingDisposes_DisposesAllLimiters()
{
var limiter1 = new CustomizableLimiter();
limiter1.DisposeImpl = _ => throw new Exception();
var limiter2 = new CustomizableLimiter();
limiter2.DisposeImpl = _ => throw new Exception();
using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
{
if (resource == "1")
{
return RateLimitPartition.Create(1, _ => limiter1);
}
return RateLimitPartition.Create(2, _ => limiter2);
});
limiter.Acquire("1");
limiter.Acquire("2");
var ex = Assert.Throws<AggregateException>(() => limiter.Dispose());
Assert.Equal(2, ex.InnerExceptions.Count);
}
[Fact]
public void Create_DisposeThrowsForFutureMethodCalls()
{
......@@ -320,6 +341,29 @@ public async Task Create_DisposeAsyncDisposesAllLimiters()
Assert.Equal(1, limiterFactory.Limiters[1].Limiter.DisposeAsyncCallCount);
}
[Fact]
public async Task Create_DisposeAsyncWithThrowingDisposes_DisposesAllLimiters()
{
var limiter1 = new CustomizableLimiter();
limiter1.DisposeAsyncCoreImpl = () => throw new Exception();
var limiter2 = new CustomizableLimiter();
limiter2.DisposeAsyncCoreImpl = () => throw new Exception();
using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
{
if (resource == "1")
{
return RateLimitPartition.Create(1, _ => limiter1);
}
return RateLimitPartition.Create(2, _ => limiter2);
});
limiter.Acquire("1");
limiter.Acquire("2");
var ex = await Assert.ThrowsAsync<AggregateException>(() => limiter.DisposeAsync().AsTask());
Assert.Equal(2, ex.InnerExceptions.Count);
}
[Fact]
public async Task Create_WithTokenBucketReplenishesAutomatically()
{
......@@ -403,6 +447,155 @@ public async Task Create_CancellationTokenPassedToUnderlyingLimiter()
await Assert.ThrowsAsync<TaskCanceledException>(async () => await waitTask);
}
[Fact]
public async Task IdleLimiterIsCleanedUp()
{
CustomizableLimiter innerLimiter = null;
var factoryCallCount = 0;
using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
{
return RateLimitPartition.Create(1, _ =>
{
factoryCallCount++;
innerLimiter = new CustomizableLimiter();
return innerLimiter;
});
});
var timerLoopMethod = StopTimerAndGetTimerFunc(limiter);
var lease = limiter.Acquire("");
Assert.True(lease.IsAcquired);
Assert.Equal(1, factoryCallCount);
var tcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
innerLimiter.DisposeAsyncCoreImpl = () =>
{
tcs.SetResult(null);
return default;
};
innerLimiter.IdleDurationImpl = () => TimeSpan.FromMinutes(1);
await timerLoopMethod();
// Limiter is disposed when timer runs and sees that IdleDuration is greater than idle limit
await tcs.Task;
innerLimiter.DisposeAsyncCoreImpl = () => default;
// Acquire will call limiter factory again as the limiter was disposed and removed
lease = limiter.Acquire("");
Assert.True(lease.IsAcquired);
Assert.Equal(2, factoryCallCount);
}
[Fact]
public async Task AllIdleLimitersCleanedUp_DisposeThrows()
{
CustomizableLimiter innerLimiter1 = null;
CustomizableLimiter innerLimiter2 = null;
using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
{
if (resource == "1")
{
return RateLimitPartition.Create(1, _ =>
{
innerLimiter1 = new CustomizableLimiter();
return innerLimiter1;
});
}
else
{
return RateLimitPartition.Create(2, _ =>
{
innerLimiter2 = new CustomizableLimiter();
return innerLimiter2;
});
}
});
var timerLoopMethod = StopTimerAndGetTimerFunc(limiter);
var lease = limiter.Acquire("1");
Assert.True(lease.IsAcquired);
Assert.NotNull(innerLimiter1);
limiter.Acquire("2");
Assert.NotNull(innerLimiter2);
var dispose1Called = false;
var dispose2Called = false;
innerLimiter1.DisposeAsyncCoreImpl = () =>
{
dispose1Called = true;
throw new Exception();
};
innerLimiter1.IdleDurationImpl = () => TimeSpan.FromMinutes(1);
innerLimiter2.DisposeAsyncCoreImpl = () =>
{
dispose2Called = true;
throw new Exception();
};
innerLimiter2.IdleDurationImpl = () => TimeSpan.FromMinutes(1);
// Run Timer
var ex = await Assert.ThrowsAsync<AggregateException>(() => timerLoopMethod());
Assert.True(dispose1Called);
Assert.True(dispose2Called);
Assert.Equal(2, ex.InnerExceptions.Count);
}
[Fact]
public async Task ThrowingTryReplenishDoesNotPreventIdleLimiterBeingCleanedUp()
{
CustomizableReplenishingLimiter replenishLimiter = new CustomizableReplenishingLimiter();
CustomizableLimiter idleLimiter = null;
var factoryCallCount = 0;
using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
{
if (resource == "1")
{
return RateLimitPartition.Create(1, _ =>
{
factoryCallCount++;
idleLimiter = new CustomizableLimiter();
return idleLimiter;
});
}
return RateLimitPartition.Create(2, _ =>
{
return replenishLimiter;
});
});
var timerLoopMethod = StopTimerAndGetTimerFunc(limiter);
// Add the replenishing limiter to the internal storage
limiter.Acquire("2");
var lease = limiter.Acquire("1");
Assert.True(lease.IsAcquired);
Assert.Equal(1, factoryCallCount);
// Start throwing from TryReplenish, this will happen the next time the Timer runs
replenishLimiter.TryReplenishImpl = () => throw new Exception();
var disposeTcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
// This DisposeAsync will be called in the same Timer iteration as the throwing TryReplenish, so we block below on the disposeTcs to make sure DisposeAsync is called even with a throwing TryReplenish
idleLimiter.DisposeAsyncCoreImpl = () =>
{
disposeTcs.SetResult(null);
return default;
};
idleLimiter.IdleDurationImpl = () => TimeSpan.FromMinutes(1);
var ex = await Assert.ThrowsAsync<AggregateException>(() => timerLoopMethod());
Assert.Single(ex.InnerExceptions);
// Wait for Timer to run again which will see the throwing TryReplenish and an idle limiter it needs to clean-up
await disposeTcs.Task;
}
internal sealed class NotImplementedPartitionedRateLimiter<T> : PartitionedRateLimiter<T>
{
public override int GetAvailablePermits(T resourceID) => throw new NotImplementedException();
......@@ -424,7 +617,7 @@ internal sealed class TrackingRateLimiter : RateLimiter
public int DisposeCallCount => _disposeCallCount;
public int DisposeAsyncCallCount => _disposeAsyncCallCount;
public override TimeSpan? IdleDuration => throw new NotImplementedException();
public override TimeSpan? IdleDuration => null;
public override int GetAvailablePermits()
{
......@@ -500,5 +693,90 @@ public int GetHashCode([DisallowNull] int obj)
return obj.GetHashCode();
}
}
internal sealed class CustomizableLimiter : RateLimiter
{
public Func<TimeSpan?> IdleDurationImpl { get; set; } = () => null;
public override TimeSpan? IdleDuration => IdleDurationImpl();
public Func<int> GetAvailablePermitsImpl { get; set; } = () => throw new NotImplementedException();
public override int GetAvailablePermits() => GetAvailablePermitsImpl();
public Func<int, RateLimitLease> AcquireCoreImpl { get; set; } = _ => new Lease();
protected override RateLimitLease AcquireCore(int permitCount) => AcquireCoreImpl(permitCount);
public Func<int, CancellationToken, ValueTask<RateLimitLease>> WaitAsyncCoreImpl { get; set; } = (_, _) => new ValueTask<RateLimitLease>(new Lease());
protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, CancellationToken cancellationToken) => WaitAsyncCoreImpl(permitCount, cancellationToken);
public Action<bool> DisposeImpl { get; set; } = _ => { };
protected override void Dispose(bool disposing) => DisposeImpl(disposing);
public Func<ValueTask> DisposeAsyncCoreImpl { get; set; } = () => default;
protected override ValueTask DisposeAsyncCore() => DisposeAsyncCoreImpl();
private sealed class Lease : RateLimitLease
{
public override bool IsAcquired => true;
public override IEnumerable<string> MetadataNames => throw new NotImplementedException();
public override bool TryGetMetadata(string metadataName, out object? metadata) => throw new NotImplementedException();
}
}
internal sealed class CustomizableReplenishingLimiter : ReplenishingRateLimiter
{
public Func<TimeSpan?> IdleDurationImpl { get; set; } = () => null;
public override TimeSpan? IdleDuration => IdleDurationImpl();
public Func<int> GetAvailablePermitsImpl { get; set; } = () => throw new NotImplementedException();
public override int GetAvailablePermits() => GetAvailablePermitsImpl();
public Func<int, RateLimitLease> AcquireCoreImpl { get; set; } = _ => new Lease();
protected override RateLimitLease AcquireCore(int permitCount) => AcquireCoreImpl(permitCount);
public Func<int, CancellationToken, ValueTask<RateLimitLease>> WaitAsyncCoreImpl { get; set; } = (_, _) => new ValueTask<RateLimitLease>(new Lease());
protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, CancellationToken cancellationToken) => WaitAsyncCoreImpl(permitCount, cancellationToken);
public Func<ValueTask> DisposeAsyncCoreImpl { get; set; } = () => default;
protected override ValueTask DisposeAsyncCore() => DisposeAsyncCoreImpl();
public override bool IsAutoReplenishing => false;
public override TimeSpan ReplenishmentPeriod => throw new NotImplementedException();
public Func<bool> TryReplenishImpl { get; set; } = () => true;
public override bool TryReplenish() => TryReplenishImpl();
private sealed class Lease : RateLimitLease
{
public override bool IsAcquired => true;
public override IEnumerable<string> MetadataNames => throw new NotImplementedException();
public override bool TryGetMetadata(string metadataName, out object? metadata) => throw new NotImplementedException();
}
}
Func<Task> StopTimerAndGetTimerFunc<T>(PartitionedRateLimiter<T> limiter)
{
var innerTimer = limiter.GetType().GetField("_timer", Reflection.BindingFlags.NonPublic | Reflection.BindingFlags.Instance);
Assert.NotNull(innerTimer);
var timerStopMethod = innerTimer.FieldType.GetMethod("Stop");
Assert.NotNull(timerStopMethod);
// Stop the current Timer so it doesn't fire unexpectedly
timerStopMethod.Invoke(innerTimer.GetValue(limiter), Array.Empty<object>());
// Create a new Timer object so that disposing the PartitionedRateLimiter doesn't fail with an ODE, but this new Timer wont actually do anything
var timerCtor = innerTimer.FieldType.GetConstructor(new Type[] { typeof(TimeSpan), typeof(TimeSpan) });
Assert.NotNull(timerCtor);
var newTimer = timerCtor.Invoke(new object[] { TimeSpan.FromMinutes(10), TimeSpan.FromMinutes(10) });
Assert.NotNull(newTimer);
innerTimer.SetValue(limiter, newTimer);
var timerLoopMethod = limiter.GetType().GetMethod("Heartbeat", Reflection.BindingFlags.NonPublic | Reflection.BindingFlags.Instance);
Assert.NotNull(timerLoopMethod);
return () => (Task)timerLoopMethod.Invoke(limiter, Array.Empty<object>());
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册