未验证 提交 f4df53be 编写于 作者: L Levi Broderick 提交者: GitHub

Make TranscodingStream resilient against partial reads (#50886)

上级 b1a41787
......@@ -312,18 +312,28 @@ public override int Read(Span<byte> buffer)
try
{
// Beware: Use our constant value instead of 'rentedBytes.Length' for the count
// parameter below. The reason for this is that the array pool could've returned
// a larger-than-expected array, but our worst-case expansion calculations
// performed earlier didn't take that into account.
int pendingReadDataPopulatedJustNow;
bool isEofReached;
int innerBytesReadJustNow = _innerStream.Read(rentedBytes, 0, DefaultReadByteBufferSize);
bool isEofReached = (innerBytesReadJustNow == 0);
do
{
// Beware: Use our constant value instead of 'rentedBytes.Length' for the count
// parameter below. The reason for this is that the array pool could've returned
// a larger-than-expected array, but our worst-case expansion calculations
// performed earlier didn't take that into account.
// convert bytes [inner] -> chars, then convert chars -> bytes [this]
int innerBytesReadJustNow = _innerStream.Read(rentedBytes, 0, DefaultReadByteBufferSize);
isEofReached = (innerBytesReadJustNow == 0);
int charsDecodedJustNow = _innerDecoder.GetChars(rentedBytes, 0, innerBytesReadJustNow, rentedChars, 0, flush: isEofReached);
int pendingReadDataPopulatedJustNow = _thisEncoder.GetBytes(rentedChars, 0, charsDecodedJustNow, _readBuffer, 0, flush: isEofReached);
// Convert bytes [inner] -> chars, then convert chars -> bytes [this].
// We can't return 0 to our caller until inner stream EOF has been reached. But if the
// inner stream returns a non-empty but incomplete buffer, GetBytes may return 0 anyway
// since it can't yet make forward progress on the input data. If this happens, we'll
// loop so that we don't return 0 to our caller until we truly see inner stream EOF.
int charsDecodedJustNow = _innerDecoder.GetChars(rentedBytes, 0, innerBytesReadJustNow, rentedChars, 0, flush: isEofReached);
pendingReadDataPopulatedJustNow = _thisEncoder.GetBytes(rentedChars, 0, charsDecodedJustNow, _readBuffer, 0, flush: isEofReached);
} while (!isEofReached && pendingReadDataPopulatedJustNow == 0);
_readBufferOffset = 0;
_readBufferCount = pendingReadDataPopulatedJustNow;
......@@ -381,18 +391,28 @@ async ValueTask<int> ReadAsyncCore(Memory<byte> buffer, CancellationToken cancel
try
{
// Beware: Use our constant value instead of 'rentedBytes.Length' when creating
// the Mem<byte> struct. The reason for this is that the array pool could've returned
// a larger-than-expected array, but our worst-case expansion calculations
// performed earlier didn't take that into account.
int innerBytesReadJustNow = await _innerStream.ReadAsync(rentedBytes.AsMemory(0, DefaultReadByteBufferSize), cancellationToken).ConfigureAwait(false);
bool isEofReached = (innerBytesReadJustNow == 0);
// convert bytes [inner] -> chars, then convert chars -> bytes [this]
int pendingReadDataPopulatedJustNow;
bool isEofReached;
int charsDecodedJustNow = _innerDecoder.GetChars(rentedBytes, 0, innerBytesReadJustNow, rentedChars, 0, flush: isEofReached);
int pendingReadDataPopulatedJustNow = _thisEncoder.GetBytes(rentedChars, 0, charsDecodedJustNow, _readBuffer, 0, flush: isEofReached);
do
{
// Beware: Use our constant value instead of 'rentedBytes.Length' when creating
// the Mem<byte> struct. The reason for this is that the array pool could've returned
// a larger-than-expected array, but our worst-case expansion calculations
// performed earlier didn't take that into account.
int innerBytesReadJustNow = await _innerStream.ReadAsync(rentedBytes.AsMemory(0, DefaultReadByteBufferSize), cancellationToken).ConfigureAwait(false);
isEofReached = (innerBytesReadJustNow == 0);
// Convert bytes [inner] -> chars, then convert chars -> bytes [this].
// We can't return 0 to our caller until inner stream EOF has been reached. But if the
// inner stream returns a non-empty but incomplete buffer, GetBytes may return 0 anyway
// since it can't yet make forward progress on the input data. If this happens, we'll
// loop so that we don't return 0 to our caller until we truly see inner stream EOF.
int charsDecodedJustNow = _innerDecoder.GetChars(rentedBytes, 0, innerBytesReadJustNow, rentedChars, 0, flush: isEofReached);
pendingReadDataPopulatedJustNow = _thisEncoder.GetBytes(rentedChars, 0, charsDecodedJustNow, _readBuffer, 0, flush: isEofReached);
} while (!isEofReached && pendingReadDataPopulatedJustNow == 0);
_readBufferOffset = 0;
_readBufferCount = pendingReadDataPopulatedJustNow;
......
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Pipelines;
using System.IO.Tests;
using System.Threading;
using System.Threading.Tasks;
......@@ -336,10 +338,10 @@ private void RunReadTest(Func<Stream, MemoryStream, int> callback)
Assert.Equal(-1, transcodingStream.ReadByte()); // should've reached EOF
// Now put some invalid data into the inner stream as EOF.
// Now put some invalid data into the inner stream, followed by EOF, and ensure we get U+FFFD back out.
innerStream.SetLength(0); // reset
innerStream.WriteByte(0xC0);
innerStream.WriteByte(0xC0); // [ C0 ] is never valid in UTF-8
innerStream.Position = 0;
sink.SetLength(0); // reset
......@@ -353,6 +355,22 @@ private void RunReadTest(Func<Stream, MemoryStream, int> callback)
Assert.Equal("[FFFD]", ErrorCheckingAsciiEncoding.GetString(sink.ToArray()));
Assert.Equal(-1, transcodingStream.ReadByte()); // should've reached EOF
// Now put some incomplete data into the inner stream, followed by EOF, and ensure we get U+FFFD back out.
innerStream.SetLength(0); // reset
innerStream.WriteByte(0xC2); // [ C2 ] must be followed by [ 80..BF ] in UTF-8
innerStream.Position = 0;
sink.SetLength(0); // reset
do
{
numBytesReadJustNow = callback(transcodingStream, sink);
Assert.True(numBytesReadJustNow >= 0);
} while (numBytesReadJustNow > 0);
Assert.Equal("[FFFD]", ErrorCheckingAsciiEncoding.GetString(sink.ToArray()));
Assert.Equal(-1, transcodingStream.ReadByte()); // should've reached EOF
void RunOneTestIteration(int stringLength)
{
sink.SetLength(0); // reset
......@@ -452,6 +470,56 @@ public async Task ReadAsync_Memory(int bufferLength)
});
}
[Fact]
public async Task ReadAsync_LoopsWhenPartialDataReceived()
{
// Validates that the TranscodingStream will loop instead of returning 0
// if the inner stream read partial data and GetBytes cannot make forward progress.
using AsyncComms comms = new AsyncComms();
Stream transcodingStream = Encoding.CreateTranscodingStream(comms.ReadStream, Encoding.UTF8, Encoding.UTF8);
// First, ensure that writing [ C0 ] (always invalid UTF-8) to the stream
// causes the reader to return immediately with fallback behavior.
byte[] readBuffer = new byte[1024];
comms.WriteBytes(new byte[] { 0xC0 });
int numBytesRead = await transcodingStream.ReadAsync(readBuffer.AsMemory());
Assert.Equal(new byte[] { 0xEF, 0xBF, 0xBD }, readBuffer[0..numBytesRead]); // fallback substitution
// Next, ensure that writing [ C2 ] (partial UTF-8, needs more data) to the stream
// causes the reader to asynchronously loop, returning "not yet complete".
readBuffer = new byte[1024];
comms.WriteBytes(new byte[] { 0xC2 });
ValueTask<int> task = transcodingStream.ReadAsync(readBuffer.AsMemory());
Assert.False(task.IsCompleted);
comms.WriteBytes(new byte[] { 0x80 }); // [ C2 80 ] is valid UTF-8
numBytesRead = await task; // should complete successfully
Assert.Equal(new byte[] { 0xC2, 0x80 }, readBuffer[0..numBytesRead]);
// Finally, ensure that writing [ C2 ] (partial UTF-8, needs more data) to the stream
// followed by EOF causes the reader to perform substitution before returning EOF.
readBuffer = new byte[1024];
comms.WriteBytes(new byte[] { 0xC2 });
task = transcodingStream.ReadAsync(readBuffer.AsMemory());
Assert.False(task.IsCompleted);
comms.WriteEof();
numBytesRead = await task; // should complete successfully
Assert.Equal(new byte[] { 0xEF, 0xBF, 0xBD }, readBuffer[0..numBytesRead]); // fallback substitution
// Next call really should return "EOF reached"
readBuffer = new byte[1024];
Assert.Equal(0, await transcodingStream.ReadAsync(readBuffer.AsMemory()));
}
[Fact]
public void ReadAsync_WithInvalidArgs_Throws()
{
......@@ -510,10 +578,10 @@ private async Task RunReadTestAsync(Func<Stream, CancellationToken, MemoryStream
Assert.Equal(-1, await transcodingStream.ReadByteAsync(expectedCancellationToken)); // should've reached EOF
// Now put some invalid data into the inner stream as EOF.
// Now put some invalid data into the inner stream, followed by EOF, and ensure we get U+FFFD back out.
innerStream.SetLength(0); // reset
innerStream.WriteByte(0xC0);
innerStream.WriteByte(0xC0); // [ C0 ] is never valid in UTF-8
innerStream.Position = 0;
sink.SetLength(0); // reset
......@@ -527,6 +595,22 @@ private async Task RunReadTestAsync(Func<Stream, CancellationToken, MemoryStream
Assert.Equal("[FFFD]", ErrorCheckingAsciiEncoding.GetString(sink.ToArray()));
Assert.Equal(-1, await transcodingStream.ReadByteAsync(expectedCancellationToken)); // should've reached EOF
// Now put some incomplete data into the inner stream, followed by EOF, and ensure we get U+FFFD back out.
innerStream.SetLength(0); // reset
innerStream.WriteByte(0xC2); // [ C2 ] must be followed by [ 80..BF ] in UTF-8
innerStream.Position = 0;
sink.SetLength(0); // reset
do
{
numBytesReadJustNow = await callback(transcodingStream, expectedCancellationToken, sink);
Assert.True(numBytesReadJustNow >= 0);
} while (numBytesReadJustNow > 0);
Assert.Equal("[FFFD]", ErrorCheckingAsciiEncoding.GetString(sink.ToArray()));
Assert.Equal(-1, await transcodingStream.ReadByteAsync(expectedCancellationToken)); // should've reached EOF
async Task RunOneTestIteration(int stringLength)
{
sink.SetLength(0); // reset
......@@ -942,5 +1026,49 @@ public override int GetChars(byte[] bytes, int byteIndex, int byteCount, char[]
public override byte[] GetPreamble() => Array.Empty<byte>();
}
// A helper type that allows synchronously writing to a stream while asynchronously
// reading from it.
private sealed class AsyncComms : IDisposable
{
private readonly BlockingCollection<byte[]> _blockingCollection;
private readonly PipeWriter _writer;
public AsyncComms()
{
_blockingCollection = new BlockingCollection<byte[]>();
var pipe = new Pipe();
ReadStream = pipe.Reader.AsStream();
_writer = pipe.Writer;
Task.Run(_DrainWorker);
}
public Stream ReadStream { get; }
public void Dispose()
{
_blockingCollection.Dispose();
}
public void WriteBytes(ReadOnlySpan<byte> bytes)
{
_blockingCollection.Add(bytes.ToArray());
}
public void WriteEof()
{
_blockingCollection.Add(null);
}
private async Task _DrainWorker()
{
byte[] buffer;
while ((buffer = _blockingCollection.Take()) is not null)
{
await _writer.WriteAsync(buffer);
}
_writer.Complete();
}
}
}
}
......@@ -93,5 +93,6 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="$(CommonTestPath)StreamConformanceTests\StreamConformanceTests.csproj" />
<ProjectReference Include="$(LibrariesProjectRoot)System.IO.Pipelines\src\System.IO.Pipelines.csproj" />
</ItemGroup>
</Project>
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册