未验证 提交 d6fef3b7 编写于 作者: B Brennan 提交者: GitHub

Add TranslateKey to PartitionedRateLimiter (#69407)

上级 2fc91080
......@@ -77,6 +77,7 @@ public abstract partial class PartitionedRateLimiter<TResource> : System.IAsyncD
public System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
protected virtual System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; }
public abstract int GetAvailablePermits(TResource resource);
public System.Threading.RateLimiting.PartitionedRateLimiter<TOuter> TranslateKey<TOuter>(System.Func<TOuter, TResource> keyAdapter) { throw null; }
public System.Threading.Tasks.ValueTask<System.Threading.RateLimiting.RateLimitLease> WaitAndAcquireAsync(TResource resource, int permitCount = 1, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
protected abstract System.Threading.Tasks.ValueTask<System.Threading.RateLimiting.RateLimitLease> WaitAndAcquireAsyncCore(TResource resource, int permitCount, System.Threading.CancellationToken cancellationToken);
}
......
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>$(NetCoreAppCurrent);$(NetCoreAppMinimum);netstandard2.0;$(NetFrameworkMinimum)</TargetFrameworks>
<IsPackable>true</IsPackable>
......@@ -37,6 +37,7 @@ System.Threading.RateLimiting.RateLimitLease</PackageDescription>
<Compile Include="System\Threading\RateLimiting\TokenBucketRateLimiter.cs" />
<Compile Include="System\Threading\RateLimiting\TokenBucketRateLimiterOptions.cs" />
<Compile Include="$(CommonPath)System\Collections\Generic\Deque.cs" Link="Common\System\Collections\Generic\Deque.cs" />
<Compile Include="System\Threading\RateLimiting\TranslatingLimiter.cs" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'">
......
......@@ -119,5 +119,23 @@ public async ValueTask DisposeAsync()
// Suppress finalization.
GC.SuppressFinalize(this);
}
/// <summary>
/// Translates PartitionedRateLimiter&lt;TOuter&gt; into the current <see cref="PartitionedRateLimiter{TResource}"/>
/// using the <paramref name="keyAdapter"/> to translate <typeparamref name="TOuter"/> to <typeparamref name="TResource"/>.
/// </summary>
/// <typeparam name="TOuter">The type to translate into <typeparamref name="TResource"/>.</typeparam>
/// <param name="keyAdapter">The function to be called every time a <typeparamref name="TOuter"/> is passed to
/// PartitionedRateLimiter&lt;TOuter&gt;.Acquire(TOuter, int) or PartitionedRateLimiter&lt;TOuter&gt;.WaitAsync(TOuter, int, CancellationToken).</param>
/// <remarks><see cref="PartitionedRateLimiter{TResource}.Dispose()"/> or <see cref="PartitionedRateLimiter{TResource}.DisposeAsync()"/> does not dispose the wrapped <see cref="PartitionedRateLimiter{TResource}"/>.</remarks>
/// <returns>A new PartitionedRateLimiter&lt;TOuter&gt; that translates <typeparamref name="TOuter"/>
/// to <typeparamref name="TResource"/> and calls the inner <see cref="PartitionedRateLimiter{TResource}"/>.</returns>
public PartitionedRateLimiter<TOuter> TranslateKey<TOuter>(Func<TOuter, TResource> keyAdapter)
{
// REVIEW: Do we want to have an option to dispose the inner limiter?
// Should the default be to dispose the inner limiter and have an option to not dispose it?
// See Stream wrappers like SslStream for prior-art
return new TranslatingLimiter<TResource, TOuter>(this, keyAdapter);
}
}
}
......@@ -86,7 +86,7 @@ public bool GetResult()
private void Tick()
{
var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted);
Action? continuation = Interlocked.Exchange(ref _callback, _callbackCompleted);
continuation?.Invoke();
}
......
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace System.Threading.RateLimiting
{
internal sealed class TranslatingLimiter<TInner, TResource> : PartitionedRateLimiter<TResource>
{
private readonly PartitionedRateLimiter<TInner> _innerRateLimiter;
private readonly Func<TResource, TInner> _keyAdapter;
private bool _disposed;
public TranslatingLimiter(PartitionedRateLimiter<TInner> inner, Func<TResource, TInner> keyAdapter)
{
_innerRateLimiter = inner;
_keyAdapter = keyAdapter;
}
public override int GetAvailablePermits(TResource resource)
{
ThrowIfDispose();
TInner key = _keyAdapter(resource);
return _innerRateLimiter.GetAvailablePermits(key);
}
protected override RateLimitLease AcquireCore(TResource resource, int permitCount)
{
ThrowIfDispose();
TInner key = _keyAdapter(resource);
return _innerRateLimiter.Acquire(key, permitCount);
}
protected override ValueTask<RateLimitLease> WaitAndAcquireAsyncCore(TResource resource, int permitCount, CancellationToken cancellationToken)
{
ThrowIfDispose();
TInner key = _keyAdapter(resource);
return _innerRateLimiter.WaitAndAcquireAsync(key, permitCount, cancellationToken);
}
protected override void Dispose(bool disposing)
{
_disposed = true;
base.Dispose(disposing);
}
protected override ValueTask DisposeAsyncCore()
{
_disposed = true;
return base.DisposeAsyncCore();
}
private void ThrowIfDispose()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(PartitionedRateLimiter));
}
}
}
}
......@@ -595,5 +595,197 @@ public async Task ThrowingTryReplenishDoesNotPreventIdleLimiterBeingCleanedUp()
// Wait for Timer to run again which will see the throwing TryReplenish and an idle limiter it needs to clean-up
await disposeTcs.Task;
}
// Translate
[Fact]
public void Translate_AcquirePassesThroughToInnerLimiter()
{
using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
{
if (resource == "1")
{
return RateLimitPartition.GetConcurrencyLimiter(1,
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
}
else
{
return RateLimitPartition.GetConcurrencyLimiter(1,
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
}
});
var translateCallCount = 0;
var translateLimiter = limiter.TranslateKey<int>(i =>
{
translateCallCount++;
return i.ToString();
});
var lease = translateLimiter.Acquire(1);
Assert.True(lease.IsAcquired);
Assert.Equal(1, translateCallCount);
var lease2 = limiter.Acquire("1");
Assert.False(lease2.IsAcquired);
lease.Dispose();
lease = limiter.Acquire("1");
Assert.True(lease.IsAcquired);
Assert.Equal(1, translateCallCount);
}
[Fact]
public async Task Translate_WaitAsyncPassesThroughToInnerLimiter()
{
using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
{
if (resource == "1")
{
return RateLimitPartition.GetConcurrencyLimiter(1,
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
}
else
{
return RateLimitPartition.GetConcurrencyLimiter(1,
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
}
});
var translateCallCount = 0;
var translateLimiter = limiter.TranslateKey<int>(i =>
{
translateCallCount++;
return i.ToString();
});
var lease = await translateLimiter.WaitAndAcquireAsync(1);
Assert.True(lease.IsAcquired);
Assert.Equal(1, translateCallCount);
var lease2 = limiter.Acquire("1");
Assert.False(lease2.IsAcquired);
lease.Dispose();
lease = limiter.Acquire("1");
Assert.True(lease.IsAcquired);
Assert.Equal(1, translateCallCount);
}
[Fact]
public void Translate_GetAvailablePermitsPassesThroughToInnerLimiter()
{
using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
{
if (resource == "1")
{
return RateLimitPartition.GetConcurrencyLimiter(1,
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
}
else
{
return RateLimitPartition.GetConcurrencyLimiter(1,
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
}
});
var translateCallCount = 0;
var translateLimiter = limiter.TranslateKey<int>(i =>
{
translateCallCount++;
return i.ToString();
});
Assert.Equal(1, translateLimiter.GetAvailablePermits(1));
Assert.Equal(1, translateCallCount);
var lease = translateLimiter.Acquire(1);
Assert.True(lease.IsAcquired);
Assert.Equal(2, translateCallCount);
Assert.Equal(0, translateLimiter.GetAvailablePermits(1));
Assert.Equal(3, translateCallCount);
var lease2 = limiter.Acquire("1");
Assert.False(lease2.IsAcquired);
lease.Dispose();
Assert.Equal(1, translateLimiter.GetAvailablePermits(1));
Assert.Equal(4, translateCallCount);
lease = limiter.Acquire("1");
Assert.True(lease.IsAcquired);
Assert.Equal(0, translateLimiter.GetAvailablePermits(1));
Assert.Equal(5, translateCallCount);
}
[Fact]
public void Translate_DisposeDoesNotDisposeInnerLimiter()
{
using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
{
if (resource == "1")
{
return RateLimitPartition.GetConcurrencyLimiter(1,
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
}
else
{
return RateLimitPartition.GetConcurrencyLimiter(1,
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
}
});
var translateCallCount = 0;
var translateLimiter = limiter.TranslateKey<int>(i =>
{
translateCallCount++;
return i.ToString();
});
translateLimiter.Dispose();
var lease = limiter.Acquire("1");
Assert.True(lease.IsAcquired);
Assert.Throws<ObjectDisposedException>(() => translateLimiter.Acquire(1));
}
[Fact]
public async Task Translate_DisposeAsyncDoesNotDisposeInnerLimiter()
{
using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
{
if (resource == "1")
{
return RateLimitPartition.GetConcurrencyLimiter(1,
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
}
else
{
return RateLimitPartition.GetConcurrencyLimiter(1,
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
}
});
var translateCallCount = 0;
var translateLimiter = limiter.TranslateKey<int>(i =>
{
translateCallCount++;
return i.ToString();
});
await translateLimiter.DisposeAsync();
var lease = limiter.Acquire("1");
Assert.True(lease.IsAcquired);
Assert.Throws<ObjectDisposedException>(() => translateLimiter.Acquire(1));
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册