未验证 提交 419f30d9 编写于 作者: E Emmanuel André 提交者: GitHub

Improve cancellation in StreamPipeReader.ReadAtLeastAsync (#53306)

* Improve cancellation in StreamPipeReader.ReadAtLeastAsync

* Introduce task variable
Co-authored-by: NStephen Toub <stoub@microsoft.com>

* Fix tests

* Expect TaskCanceledException on canceled token
Co-authored-by: NStephen Toub <stoub@microsoft.com>
上级 e16be6db
......@@ -664,6 +664,11 @@ internal ValueTask<ReadResult> ReadAtLeastAsync(int minimumBytes, CancellationTo
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}
if (token.IsCancellationRequested)
{
return new ValueTask<ReadResult>(Task.FromCanceled<ReadResult>(token));
}
CompletionData completionData = default;
ValueTask<ReadResult> result;
lock (SyncObj)
......@@ -715,6 +720,11 @@ internal ValueTask<ReadResult> ReadAsync(CancellationToken token)
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}
if (token.IsCancellationRequested)
{
return new ValueTask<ReadResult>(Task.FromCanceled<ReadResult>(token));
}
ValueTask<ReadResult> result;
lock (SyncObj)
{
......
......@@ -194,7 +194,10 @@ public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationTo
// TODO ReadyAsync needs to throw if there are overlapping reads.
ThrowIfCompleted();
cancellationToken.ThrowIfCancellationRequested();
if (cancellationToken.IsCancellationRequested)
{
return new ValueTask<ReadResult>(Task.FromCanceled<ReadResult>(cancellationToken));
}
// PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock)
CancellationTokenSource tokenSource = InternalTokenSource;
......@@ -273,7 +276,10 @@ protected override ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumSize, C
// TODO ReadyAsync needs to throw if there are overlapping reads.
ThrowIfCompleted();
cancellationToken.ThrowIfCancellationRequested();
if (cancellationToken.IsCancellationRequested)
{
return new ValueTask<ReadResult>(Task.FromCanceled<ReadResult>(cancellationToken));
}
// PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock)
CancellationTokenSource tokenSource = InternalTokenSource;
......
......@@ -178,7 +178,7 @@ public async Task CancelingBetweenReadsThrowsOperationCancelledException()
Pipe.Writer.WriteEmpty(10);
await Pipe.Writer.FlushAsync();
await Assert.ThrowsAsync<OperationCanceledException>(() => task);
await Assert.ThrowsAsync<TaskCanceledException>(() => task);
}
[Fact]
......
......@@ -141,7 +141,8 @@ public async Task CanReadAtLeast(int bufferSize, bool bufferedRead)
[Fact]
public Task ReadAtLeastAsyncThrowsIfPassedCanceledCancellationToken()
{
return Assert.ThrowsAsync<OperationCanceledException>(async () => await PipeReader.ReadAtLeastAsync(0, new CancellationToken(true)));
ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(0, new CancellationToken(canceled: true));
return Assert.ThrowsAsync<TaskCanceledException>(async () => await task);
}
[Fact]
......
......@@ -362,12 +362,10 @@ public void ReadAsyncReturnsIsCancelOnCancelPendingReadBeforeGetResult()
}
[Fact]
public void ReadAsyncThrowsIfPassedCanceledCancellationToken()
public Task ReadAsyncThrowsIfPassedCanceledCancellationToken()
{
var cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.Cancel();
Assert.Throws<OperationCanceledException>(() => Pipe.Reader.ReadAsync(cancellationTokenSource.Token));
ValueTask<ReadResult> task = Pipe.Reader.ReadAsync(new CancellationToken(canceled: true));
return Assert.ThrowsAsync<TaskCanceledException>(async () => await task);
}
[Fact]
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册