未验证 提交 bd2931ea 编写于 作者: W Wraith 提交者: GitHub

Add PipeWriter CanGetUnflushedBytes and UnflushedBytes properties (#54164)

* Add PipeWriter CanGetUnflushedBytes and UnflushedBytes properties and covering tests

* address feedback
上级 2832987b
......@@ -71,6 +71,7 @@ public abstract partial class PipeWriter : System.Buffers.IBufferWriter<byte>
public abstract void Advance(int bytes);
public virtual System.IO.Stream AsStream(bool leaveOpen = false) { throw null; }
public abstract void CancelPendingFlush();
public virtual bool CanGetUnflushedBytes => throw null;
public abstract void Complete(System.Exception? exception = null);
public virtual System.Threading.Tasks.ValueTask CompleteAsync(System.Exception? exception = null) { throw null; }
protected internal virtual System.Threading.Tasks.Task CopyFromAsync(System.IO.Stream source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
......@@ -80,6 +81,7 @@ public abstract partial class PipeWriter : System.Buffers.IBufferWriter<byte>
public abstract System.Span<byte> GetSpan(int sizeHint = 0);
[System.ObsoleteAttribute("OnReaderCompleted may not be invoked on all implementations of PipeWriter. This will be removed in a future release.")]
public virtual void OnReaderCompleted(System.Action<System.Exception?, object?> callback, object? state) { }
public virtual long UnflushedBytes => throw null;
public virtual System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> WriteAsync(System.ReadOnlyMemory<byte> source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}
public readonly partial struct ReadResult
......
......@@ -168,4 +168,7 @@
<data name="WritingAfterCompleted" xml:space="preserve">
<value>Writing is not allowed after writer was completed.</value>
</data>
<data name="UnflushedBytesNotSupported" xml:space="preserve">
<value>UnflushedBytes is not supported.</value>
</data>
</root>
\ No newline at end of file
......@@ -23,6 +23,8 @@ public DefaultPipeWriter(Pipe pipe)
public override void CancelPendingFlush() => _pipe.CancelPendingFlush();
public override bool CanGetUnflushedBytes => true;
#pragma warning disable CS0672 // Member overrides obsolete member
public override void OnReaderCompleted(Action<Exception?, object?> callback, object? state) => _pipe.OnReaderCompleted(callback, state);
#pragma warning restore CS0672 // Member overrides obsolete member
......@@ -41,6 +43,8 @@ public DefaultPipeWriter(Pipe pipe)
public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _pipe.OnFlushAsyncCompleted(continuation, state, flags);
public override long UnflushedBytes => _pipe.GetUnflushedBytes();
public override ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
{
return _pipe.WriteAsync(source, cancellationToken);
......
......@@ -1009,6 +1009,8 @@ internal FlushResult GetFlushAsyncResult()
return result;
}
internal long GetUnflushedBytes() => _unflushedBytes;
private void GetFlushResult(ref FlushResult result)
{
// Change the state from to be canceled -> observed
......
......@@ -36,6 +36,10 @@ public virtual ValueTask CompleteAsync(Exception? exception = null)
/// <remarks>The canceled <see cref="System.IO.Pipelines.PipeWriter.FlushAsync(System.Threading.CancellationToken)" /> or <see cref="System.IO.Pipelines.PipeWriter.WriteAsync(System.ReadOnlyMemory{byte},System.Threading.CancellationToken)" /> operation returns a <see cref="System.IO.Pipelines.FlushResult" /> where <see cref="System.IO.Pipelines.FlushResult.IsCanceled" /> is <see langword="true" />.</remarks>
public abstract void CancelPendingFlush();
/// <summary>Gets a value that indicates whether the current <see cref="System.IO.Pipelines.PipeWriter" /> supports reporting the count of unflushed bytes.</summary>
/// <value><see langword="true" />If a class derived from <see cref="System.IO.Pipelines.PipeWriter" /> does not support getting the unflushed bytes, calls to <see cref="System.IO.Pipelines.PipeWriter.UnflushedBytes" /> throw <see cref="System.NotImplementedException" />.</value>
public virtual bool CanGetUnflushedBytes => false;
/// <summary>Registers a callback that executes when the <see cref="System.IO.Pipelines.PipeReader" /> side of the pipe is completed.</summary>
/// <param name="callback">The callback to register.</param>
/// <param name="state">The state object to pass to <paramref name="callback" /> when it's invoked.</param>
......@@ -143,5 +147,11 @@ protected internal virtual async Task CopyFromAsync(Stream source, CancellationT
}
}
}
/// <summary>
/// When overridden in a derived class, gets the count of unflushed bytes within the current writer.
/// </summary>
/// <exception cref="System.NotImplementedException">The <see cref="System.IO.Pipelines.PipeWriter"/> does not support getting the unflushed byte count.</exception>
public virtual long UnflushedBytes => throw ThrowHelper.CreateNotSupportedException_UnflushedBytes();
}
}
......@@ -205,6 +205,9 @@ public override void CancelPendingFlush()
Cancel();
}
/// <inheritdoc />
public override bool CanGetUnflushedBytes => true;
/// <inheritdoc />
public override void Complete(Exception? exception = null)
{
......@@ -259,6 +262,9 @@ public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellation
return FlushAsyncInternal(writeToStream: true, data: Memory<byte>.Empty, cancellationToken);
}
/// <inheritdoc />
public override long UnflushedBytes => _bytesBuffered;
public override ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
{
return FlushAsyncInternal(writeToStream: true, data: source, cancellationToken);
......
......@@ -83,6 +83,11 @@ internal static class ThrowHelper
public static void ThrowInvalidOperationException_InvalidZeroByteRead() => throw CreateInvalidOperationException_InvalidZeroByteRead();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_InvalidZeroByteRead() => new InvalidOperationException(SR.InvalidZeroByteRead);
[DoesNotReturn]
public static void ThrowNotSupported_UnflushedBytes() => throw CreateNotSupportedException_UnflushedBytes();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateNotSupportedException_UnflushedBytes() => new NotSupportedException(SR.UnflushedBytesNotSupported);
}
internal enum ExceptionArgument
......
......@@ -197,7 +197,7 @@ public async Task WritesUsingGetSpanWorks()
await writer.FlushAsync();
writer.Complete();
Assert.Equal(0, writer.UnflushedBytes);
ReadResult readResult = await pipe.Reader.ReadAsync();
Assert.Equal(bytes, readResult.Buffer.ToArray());
pipe.Reader.AdvanceTo(readResult.Buffer.End);
......@@ -220,7 +220,7 @@ public async Task WritesUsingGetMemoryWorks()
await writer.FlushAsync();
writer.Complete();
Assert.Equal(0, writer.UnflushedBytes);
ReadResult readResult = await pipe.Reader.ReadAsync();
Assert.Equal(bytes, readResult.Buffer.ToArray());
pipe.Reader.AdvanceTo(readResult.Buffer.End);
......@@ -290,6 +290,7 @@ public async Task GetMemoryFlushWithACompletedReaderNoops()
Assert.Equal(1, pool.CurrentlyRentedBlocks);
pipe.Writer.Complete();
Assert.Equal(0, pool.CurrentlyRentedBlocks);
Assert.Equal(0, Pipe.Writer.UnflushedBytes);
}
}
}
......@@ -26,6 +26,8 @@ public void NothingWrittenToStreamUnlessFlushed()
Assert.Equal(0, stream.Length);
writer.Complete();
}
[Fact]
......@@ -42,6 +44,7 @@ public void DataFlushedOnComplete()
writer.Complete();
Assert.Equal(0, writer.UnflushedBytes);
Assert.Equal(bytes.Length, stream.Length);
Assert.Equal("Hello World", Encoding.ASCII.GetString(stream.ToArray()));
}
......@@ -62,6 +65,7 @@ public async Task DataFlushedOnCompleteAsync()
Assert.Equal(bytes.Length, stream.Length);
Assert.Equal("Hello World", Encoding.ASCII.GetString(stream.ToArray()));
Assert.Equal(0, writer.UnflushedBytes);
}
[Fact]
......@@ -132,6 +136,8 @@ public async Task DataWrittenOnFlushAsync()
Assert.Equal("Hello World", Encoding.ASCII.GetString(stream.ToArray()));
writer.Complete();
Assert.Equal(0, writer.UnflushedBytes);
}
[Fact]
......@@ -143,6 +149,7 @@ public async Task FlushAsyncWithoutWritingDoesNotFlushAsync()
Assert.False(stream.FlushAsyncCalled);
writer.Complete();
Assert.Equal(0, writer.UnflushedBytes);
}
[Fact]
......@@ -162,6 +169,7 @@ public async Task WritesUsingGetSpanWorks()
await writer.FlushAsync();
Assert.Equal(bytes, stream.ToArray());
writer.Complete();
Assert.Equal(0, writer.UnflushedBytes);
}
[Fact]
......@@ -181,6 +189,7 @@ public async Task WritesUsingGetMemoryWorks()
await writer.FlushAsync();
Assert.Equal(bytes, stream.ToArray());
writer.Complete();
Assert.Equal(0, writer.UnflushedBytes);
}
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
......@@ -397,6 +406,7 @@ public async Task WriteAsyncThrowsDuringMultiSegmentWriteCompleteReturnsAllMemor
Assert.Equal(1, pool.DisposedBlocks);
writer.Complete();
Assert.Equal(0, writer.UnflushedBytes);
Assert.Equal(0, pool.CurrentlyRentedBlocks);
Assert.Equal(3, pool.DisposedBlocks);
}
......@@ -441,6 +451,7 @@ public void CompletingWithBufferedBytesStillReturnsMemoryToPool()
Assert.Equal(0, stream.Length);
writer.Complete();
Assert.Equal(0, writer.UnflushedBytes);
Assert.Equal(0, pool.CurrentlyRentedBlocks);
Assert.Equal(1, pool.DisposedBlocks);
}
......@@ -462,7 +473,7 @@ public void GetMemorySameAsTheMaxPoolSizeUsesThePool()
Assert.Equal(0, pool.DisposedBlocks);
writer.Complete();
Assert.Equal(0, writer.UnflushedBytes);
Assert.Equal(0, pool.CurrentlyRentedBlocks);
Assert.Equal(1, pool.DisposedBlocks);
}
......@@ -482,7 +493,7 @@ public void GetMemoryBiggerThanPoolSizeAllocatesArrayPoolArray()
Assert.Equal(0, pool.DisposedBlocks);
writer.Complete();
Assert.Equal(0, writer.UnflushedBytes);
Assert.Equal(0, pool.CurrentlyRentedBlocks);
Assert.Equal(0, pool.DisposedBlocks);
}
......@@ -592,6 +603,25 @@ public async Task OperationCancelledExceptionNotSwallowedIfNotThrownFromSpecifie
await Assert.ThrowsAsync<OperationCanceledException>(async () => await writer.WriteAsync(new byte[1]));
}
[Fact]
public void UnflushedBytesWorks()
{
byte[] bytes = Encoding.ASCII.GetBytes("Hello World");
var stream = new MemoryStream();
PipeWriter writer = PipeWriter.Create(stream);
Assert.True(writer.CanGetUnflushedBytes);
bytes.AsSpan().CopyTo(writer.GetSpan(bytes.Length));
writer.Advance(bytes.Length);
Assert.Equal(bytes.Length, writer.UnflushedBytes);
writer.Complete();
Assert.Equal(0, writer.UnflushedBytes);
}
private class ThrowsOperationCanceledExceptionStream : WriteOnlyStream
{
public override void Write(byte[] buffer, int offset, int count)
......
......@@ -42,6 +42,7 @@
<Compile Include="Infrastructure\TestMemoryPool.cs" />
<Compile Include="StreamPipeWriterTests.cs" />
<Compile Include="Infrastructure\ThrowAfterNWritesStream.cs" />
<Compile Include="UnflushedBytesTests.cs" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == '$(NetCoreAppCurrent)'">
<Compile Include="PipeLengthTests.cs" />
......
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace System.IO.Pipelines.Tests
{
public class UnflushedBytesTests : PipeTest
{
internal class MinimalPipeWriter : PipeWriter
{
public override void Advance(int bytes) => throw new NotImplementedException();
public override void CancelPendingFlush() => throw new NotImplementedException();
public override void Complete(Exception? exception = null) => throw new NotImplementedException();
public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default) => throw new NotImplementedException();
public override Memory<byte> GetMemory(int sizeHint = 0) => throw new NotImplementedException();
public override Span<byte> GetSpan(int sizeHint = 0) => throw new NotImplementedException();
}
public UnflushedBytesTests() : base(0, 0)
{
}
[Fact]
public void NonOverriddenUnflushedBytesThrows()
{
MinimalPipeWriter writer = new MinimalPipeWriter();
Assert.False(writer.CanGetUnflushedBytes);
_ = Assert.Throws<NotSupportedException>(() => { long value = writer.UnflushedBytes; }); ;
}
[Fact]
public void UnflushedBytesWorks()
{
byte[] bytes = Encoding.ASCII.GetBytes("abcdefghijklmnopqrstuvwzyz");
Pipe.Writer.Write(bytes);
Assert.True(Pipe.Writer.CanGetUnflushedBytes);
Assert.Equal(bytes.Length,Pipe.Writer.UnflushedBytes);
_ = Pipe.Writer.FlushAsync().GetAwaiter().GetResult();
Assert.Equal(0, Pipe.Writer.UnflushedBytes);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册