未验证 提交 7e255ab7 编写于 作者: G Geoff Kizer 提交者: GitHub

minor quic improvements (#43151)

* minor quic improvements

* address PR feedback

* more feedback
Co-authored-by: NGeoffrey Kizer <geoffrek@windows.microsoft.com>
上级 12025658
......@@ -185,7 +185,7 @@ public void TryEnsureAvailableSpaceUpToLimit(int byteCount, int limit)
ArrayPool<byte>.Shared.Return(oldBytes);
}
Debug.Assert(byteCount <= AvailableLength);
Debug.Assert(byteCount <= AvailableLength || desiredSize == limit);
}
public void Grow()
......
......@@ -222,10 +222,22 @@ internal sealed class StreamState
public long _outboundErrorCode;
public long _inboundErrorCode;
private const int InitialBufferSize =
#if DEBUG
10;
#else
4096;
#endif
private const int MaxBufferSize =
#if DEBUG
4096;
#else
32 * 1024;
#endif
public StreamState(long streamId, bool bidirectional)
{
_streamId = streamId;
_outboundStreamBuffer = new StreamBuffer();
_outboundStreamBuffer = new StreamBuffer(initialBufferSize: InitialBufferSize, maxBufferSize: MaxBufferSize);
_inboundStreamBuffer = (bidirectional ? new StreamBuffer() : null);
}
}
......
......@@ -13,7 +13,7 @@ namespace System.IO
internal sealed class StreamBuffer : IDisposable
{
private ArrayBuffer _buffer; // mutable struct, do not make this readonly
private readonly int _maxSize;
private readonly int _maxBufferSize;
private bool _writeEnded;
private bool _readAborted;
private readonly ResettableValueTaskSource _readTaskSource;
......@@ -23,10 +23,10 @@ internal sealed class StreamBuffer : IDisposable
public const int DefaultInitialBufferSize = 4 * 1024;
public const int DefaultMaxBufferSize = 32 * 1024;
public StreamBuffer(int initialSize = DefaultInitialBufferSize, int maxSize = DefaultMaxBufferSize)
public StreamBuffer(int initialBufferSize = DefaultInitialBufferSize, int maxBufferSize = DefaultMaxBufferSize)
{
_buffer = new ArrayBuffer(initialSize, usePool: true);
_maxSize = maxSize;
_buffer = new ArrayBuffer(initialBufferSize, usePool: true);
_maxBufferSize = maxBufferSize;
_readTaskSource = new ResettableValueTaskSource();
_writeTaskSource = new ResettableValueTaskSource();
}
......@@ -86,7 +86,7 @@ public int WriteBytesAvailable
throw new InvalidOperationException();
}
return _maxSize - _buffer.ActiveLength;
return _maxBufferSize - _buffer.ActiveLength;
}
}
}
......@@ -108,7 +108,7 @@ public int WriteBytesAvailable
return (false, buffer.Length);
}
_buffer.TryEnsureAvailableSpaceUpToLimit(buffer.Length, _maxSize);
_buffer.TryEnsureAvailableSpaceUpToLimit(buffer.Length, _maxBufferSize);
int bytesWritten = Math.Min(buffer.Length, _buffer.AvailableLength);
if (bytesWritten > 0)
......@@ -299,7 +299,7 @@ private sealed class ResettableValueTaskSource : IValueTaskSource
// and dispose/clear the cancellation registration in GetResult to guarantee it will not affect subsequent waiters.
// The rest of the logic is deferred to ManualResetValueTaskSourceCore.
private ManualResetValueTaskSourceCore<bool> _waitSource = new ManualResetValueTaskSourceCore<bool> { RunContinuationsAsynchronously = true }; // mutable struct, do not make this readonly
private ManualResetValueTaskSourceCore<bool> _waitSource; // mutable struct, do not make this readonly
private CancellationToken _waitSourceCancellationToken;
private CancellationTokenRegistration _waitSourceCancellation;
private int _hasWaiter;
......@@ -340,7 +340,11 @@ private void CancelWaiter()
public void Reset()
{
Debug.Assert(_hasWaiter == 0);
if (_hasWaiter != 0)
{
Debug.Fail("Concurrent use is not supported");
throw new InvalidOperationException("Concurrent use is not supported");
}
_waitSource.Reset();
Volatile.Write(ref _hasWaiter, 1);
......
......@@ -228,6 +228,85 @@ public async Task GetStreamIdWithoutStartWorks()
Assert.Equal(0, clientStream.StreamId);
}
[Fact]
public async Task LargeDataSentAndReceived()
{
byte[] data = Enumerable.Range(0, 64 * 1024).Select(x => (byte)x).ToArray();
const int NumberOfWrites = 256; // total sent = 16M
using QuicListener listener = CreateQuicListener();
for (int j = 0; j < 100; j++)
{
Task listenTask = Task.Run(async () =>
{
using QuicConnection connection = await listener.AcceptConnectionAsync();
await using QuicStream stream = await connection.AcceptStreamAsync();
byte[] buffer = new byte[data.Length];
for (int i = 0; i < NumberOfWrites; i++)
{
int totalBytesRead = 0;
while (totalBytesRead < data.Length)
{
int bytesRead = await stream.ReadAsync(buffer.AsMemory().Slice(totalBytesRead));
Assert.NotEqual(0, bytesRead);
totalBytesRead += bytesRead;
}
Assert.Equal(data.Length, totalBytesRead);
Assert.True(data.AsSpan().SequenceEqual(buffer));
}
for (int i = 0; i < NumberOfWrites; i++)
{
await stream.WriteAsync(data);
}
await stream.WriteAsync(Memory<byte>.Empty, endStream: true);
await stream.ShutdownWriteCompleted();
await connection.CloseAsync(errorCode: 0);
});
Task clientTask = Task.Run(async () =>
{
using QuicConnection connection = CreateQuicConnection(listener.ListenEndPoint);
await connection.ConnectAsync();
await using QuicStream stream = connection.OpenBidirectionalStream();
byte[] buffer = new byte[data.Length];
for (int i = 0; i < NumberOfWrites; i++)
{
await stream.WriteAsync(data);
}
await stream.WriteAsync(Memory<byte>.Empty, endStream: true);
for (int i = 0; i < NumberOfWrites; i++)
{
int totalBytesRead = 0;
while (totalBytesRead < data.Length)
{
int bytesRead = await stream.ReadAsync(buffer.AsMemory().Slice(totalBytesRead));
Assert.NotEqual(0, bytesRead);
totalBytesRead += bytesRead;
}
Assert.Equal(data.Length, totalBytesRead);
Assert.True(data.AsSpan().SequenceEqual(buffer));
}
await stream.ShutdownWriteCompleted();
await connection.CloseAsync(errorCode: 0);
});
await (new[] { listenTask, clientTask }).WhenAllOrAnyFailed(millisecondsTimeout: 1000000);
}
}
[Fact]
public async Task TestStreams()
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册