未验证 提交 81d3a993 编写于 作者: E Eirik Tsarpalis 提交者: GitHub

System.Text.Json: Add IAsyncEnumerable support (#50778)

* implement IAsyncEnumerable JsonConverter

* Prototype of IAsyncEnumerable deserialize with Stream

* Use a Queue + test buffersizes

* Avoid 1 item lag

* Add support for Serialize

* Misc cleanup on test

* extend DeserializeAsyncEnumerable test coverage

also removes SerializeAsyncEnumerable components

* Update src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IAsyncEnumerableConverterFactory.cs
Co-authored-by: NStephen Toub <stoub@microsoft.com>

* address feedback

* tweak test buffer values

* Update src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IAsyncEnumerableOfTConverter.cs
Co-authored-by: NStephen Toub <stoub@microsoft.com>

* Update src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IAsyncEnumerableOfTConverter.cs
Co-authored-by: NStephen Toub <stoub@microsoft.com>

* address feedback

* increase delayInterval in serialization tests

* address feedback

* address feedback

* add test on exceptional IAsyncDisposable disposal

* address feedback

* Update src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ReadAsyncState.cs
Co-authored-by: NLayomi Akinrinade <layomia@gmail.com>

* Update src/libraries/System.Text.Json/src/System/Text/Json/Serialization/WriteStack.cs
Co-authored-by: NLayomi Akinrinade <layomia@gmail.com>

* fix build and remove dead code

* address feedback

* Revert unneeded JsonClassInfo.ElementType workaround

* remove state allocation on async deserialization methods

* remove tooling artifacts

* address feedback

* reset AsyncEnumeratorIsPendingCompletion field
Co-authored-by: NSteve Harter <steveharter@users.noreply.github.com>
Co-authored-by: NStephen Toub <stoub@microsoft.com>
Co-authored-by: NLayomi Akinrinade <layomia@gmail.com>
上级 11197256
......@@ -192,6 +192,7 @@ public static partial class JsonSerializer
public static object? Deserialize(ref System.Text.Json.Utf8JsonReader reader, [System.Diagnostics.CodeAnalysis.DynamicallyAccessedMembersAttribute(System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicConstructors | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicFields | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicProperties)] System.Type returnType, System.Text.Json.JsonSerializerOptions? options = null) { throw null; }
public static System.Threading.Tasks.ValueTask<object?> DeserializeAsync(System.IO.Stream utf8Json, [System.Diagnostics.CodeAnalysis.DynamicallyAccessedMembersAttribute(System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicConstructors | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicFields | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicProperties)] System.Type returnType, System.Text.Json.JsonSerializerOptions? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public static System.Threading.Tasks.ValueTask<TValue?> DeserializeAsync<[System.Diagnostics.CodeAnalysis.DynamicallyAccessedMembersAttribute(System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicConstructors | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicFields | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicProperties)] TValue>(System.IO.Stream utf8Json, System.Text.Json.JsonSerializerOptions? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public static System.Collections.Generic.IAsyncEnumerable<TValue?> DeserializeAsyncEnumerable<[System.Diagnostics.CodeAnalysis.DynamicallyAccessedMembersAttribute(System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicConstructors | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicFields | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicProperties)] TValue>(System.IO.Stream utf8Json, System.Text.Json.JsonSerializerOptions? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public static TValue? Deserialize<[System.Diagnostics.CodeAnalysis.DynamicallyAccessedMembersAttribute(System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicConstructors | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicFields | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicProperties)] TValue>(System.ReadOnlySpan<byte> utf8Json, System.Text.Json.JsonSerializerOptions? options = null) { throw null; }
public static TValue? Deserialize<[System.Diagnostics.CodeAnalysis.DynamicallyAccessedMembersAttribute(System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicConstructors | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicFields | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicProperties)] TValue>(string json, System.Text.Json.JsonSerializerOptions? options = null) { throw null; }
public static TValue? Deserialize<[System.Diagnostics.CodeAnalysis.DynamicallyAccessedMembersAttribute(System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicConstructors | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicFields | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicProperties)] TValue>(System.ReadOnlySpan<char> json, System.Text.Json.JsonSerializerOptions? options = null) { throw null; }
......
......@@ -357,6 +357,9 @@
<data name="SerializationNotSupportedType" xml:space="preserve">
<value>The type '{0}' is not supported.</value>
</data>
<data name="TypeRequiresAsyncSerialization" xml:space="preserve">
<value>The type '{0}' can only be serialized using async serialization methods.</value>
</data>
<data name="InvalidCharacterAtStartOfComment" xml:space="preserve">
<value>'{0}' is invalid after '/' at the beginning of the comment. Expected either '/' or '*'.</value>
</data>
......@@ -557,4 +560,4 @@
<data name="SerializerConverterFactoryReturnsJsonConverterFactory" xml:space="preserve">
<value>The converter '{0}' cannot return an instance of JsonConverterFactory.</value>
</data>
</root>
\ No newline at end of file
</root>
......@@ -73,6 +73,8 @@
<Compile Include="System\Text\Json\Serialization\Converters\Collection\ConcurrentStackOfTConverter.cs" />
<Compile Include="System\Text\Json\Serialization\Converters\Collection\DictionaryDefaultConverter.cs" />
<Compile Include="System\Text\Json\Serialization\Converters\Collection\DictionaryOfTKeyTValueConverter.cs" />
<Compile Include="System\Text\Json\Serialization\Converters\Collection\IAsyncEnumerableConverterFactory.cs" />
<Compile Include="System\Text\Json\Serialization\Converters\Collection\IAsyncEnumerableOfTConverter.cs" />
<Compile Include="System\Text\Json\Serialization\Converters\Collection\ICollectionOfTConverter.cs" />
<Compile Include="System\Text\Json\Serialization\Converters\Collection\IDictionaryConverter.cs" />
<Compile Include="System\Text\Json\Serialization\Converters\Collection\IDictionaryOfTKeyTValueConverter.cs" />
......@@ -173,6 +175,7 @@
<Compile Include="System\Text\Json\Serialization\PreserveReferenceHandler.cs" />
<Compile Include="System\Text\Json\Serialization\PreserveReferenceResolver.cs" />
<Compile Include="System\Text\Json\Serialization\PropertyRef.cs" />
<Compile Include="System\Text\Json\Serialization\ReadAsyncBufferState.cs" />
<Compile Include="System\Text\Json\Serialization\ReadStack.cs" />
<Compile Include="System\Text\Json\Serialization\ReadStackFrame.cs" />
<Compile Include="System\Text\Json\Serialization\ReferenceHandler.cs" />
......@@ -233,11 +236,9 @@
<ItemGroup Condition="$(TargetFramework.StartsWith('netstandard')) or $(TargetFramework.StartsWith('net4'))">
<Compile Include="System\Collections\Generic\StackExtensions.netstandard.cs" />
<!-- Common or Common-branched source files -->
<Compile Include="$(CommonPath)System\Buffers\ArrayBufferWriter.cs"
Link="Common\System\Buffers\ArrayBufferWriter.cs" />
<Compile Include="$(CommonPath)System\Buffers\ArrayBufferWriter.cs" Link="Common\System\Buffers\ArrayBufferWriter.cs" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == '$(NetCoreAppCurrent)' or
'$(TargetFramework)' == 'netcoreapp3.0'">
<ItemGroup Condition="'$(TargetFramework)' == '$(NetCoreAppCurrent)' or '$(TargetFramework)' == 'netcoreapp3.0'">
<Reference Include="System.Buffers" />
<Reference Include="System.Collections" />
<Reference Include="System.Collections.Concurrent" />
......
......@@ -20,7 +20,7 @@ internal enum ClassType : byte
Value = 0x2,
// JsonValueConverter<> - simple values that need to re-enter the serializer such as KeyValuePair<TKey, TValue>.
NewValue = 0x4,
// JsonIEnumerbleConverter<> - all enumerable collections except dictionaries.
// JsonIEnumerableConverter<> - all enumerable collections except dictionaries.
Enumerable = 0x8,
// JsonDictionaryConverter<,> - dictionary types.
Dictionary = 0x10,
......
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Text.Json.Serialization.Converters;
namespace System.Text.Json.Serialization
{
/// <summary>
/// Converter for streaming <see cref="IAsyncEnumerable{T}" /> values.
/// </summary>
internal sealed class IAsyncEnumerableConverterFactory : JsonConverterFactory
{
public override bool CanConvert(Type typeToConvert) => GetAsyncEnumerableInterface(typeToConvert) is not null;
public override JsonConverter CreateConverter(Type typeToConvert, JsonSerializerOptions options)
{
Type? asyncEnumerableInterface = GetAsyncEnumerableInterface(typeToConvert);
Debug.Assert(asyncEnumerableInterface is not null, $"{typeToConvert} not supported by converter.");
Type elementType = asyncEnumerableInterface.GetGenericArguments()[0];
Type converterType = typeof(IAsyncEnumerableOfTConverter<,>).MakeGenericType(typeToConvert, elementType);
return (JsonConverter)Activator.CreateInstance(converterType)!;
}
private static Type? GetAsyncEnumerableInterface(Type type)
=> IEnumerableConverterFactoryHelpers.GetCompatibleGenericInterface(type, typeof(IAsyncEnumerable<>));
}
}
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace System.Text.Json.Serialization.Converters
{
internal sealed class IAsyncEnumerableOfTConverter<TAsyncEnumerable, TElement>
: IEnumerableDefaultConverter<TAsyncEnumerable, TElement>
where TAsyncEnumerable : IAsyncEnumerable<TElement>
{
internal override bool OnTryRead(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options, ref ReadStack state, out TAsyncEnumerable value)
{
if (!typeToConvert.IsAssignableFrom(typeof(IAsyncEnumerable<TElement>)))
{
ThrowHelper.ThrowNotSupportedException_CannotPopulateCollection(TypeToConvert, ref reader, ref state);
}
return base.OnTryRead(ref reader, typeToConvert, options, ref state, out value!);
}
protected override void Add(in TElement value, ref ReadStack state)
{
((BufferedAsyncEnumerable)state.Current.ReturnValue!)._buffer.Add(value);
}
protected override void CreateCollection(ref Utf8JsonReader reader, ref ReadStack state, JsonSerializerOptions options)
{
state.Current.ReturnValue = new BufferedAsyncEnumerable();
}
internal override bool OnTryWrite(Utf8JsonWriter writer, TAsyncEnumerable value, JsonSerializerOptions options, ref WriteStack state)
{
if (!state.SupportContinuation)
{
ThrowHelper.ThrowNotSupportedException_TypeRequiresAsyncSerialization(TypeToConvert);
}
return base.OnTryWrite(writer, value, options, ref state);
}
[Diagnostics.CodeAnalysis.SuppressMessage("Reliability", "CA2012:Use ValueTasks correctly", Justification = "Converter needs to consume ValueTask's in a non-async context")]
protected override bool OnWriteResume(Utf8JsonWriter writer, TAsyncEnumerable value, JsonSerializerOptions options, ref WriteStack state)
{
IAsyncEnumerator<TElement> enumerator;
ValueTask<bool> moveNextTask;
if (state.Current.AsyncEnumerator is null)
{
enumerator = value.GetAsyncEnumerator(state.CancellationToken);
moveNextTask = enumerator.MoveNextAsync();
// we always need to attach the enumerator to the stack
// since it will need to be disposed asynchronously.
state.Current.AsyncEnumerator = enumerator;
}
else
{
Debug.Assert(state.Current.AsyncEnumerator is IAsyncEnumerator<TElement>);
enumerator = (IAsyncEnumerator<TElement>)state.Current.AsyncEnumerator;
if (state.Current.AsyncEnumeratorIsPendingCompletion)
{
// converter was previously suspended due to a pending MoveNextAsync() task
Debug.Assert(state.PendingTask is Task<bool> && state.PendingTask.IsCompleted);
moveNextTask = new ValueTask<bool>((Task<bool>)state.PendingTask);
state.Current.AsyncEnumeratorIsPendingCompletion = false;
state.PendingTask = null;
}
else
{
// converter was suspended for a different reason;
// the last MoveNextAsync() call can only have completed with 'true'.
moveNextTask = new ValueTask<bool>(true);
}
}
JsonConverter<TElement> converter = GetElementConverter(ref state);
// iterate through the enumerator while elements are being returned synchronously
for (; moveNextTask.IsCompleted; moveNextTask = enumerator.MoveNextAsync())
{
if (!moveNextTask.Result)
{
return true;
}
if (ShouldFlush(writer, ref state))
{
return false;
}
TElement element = enumerator.Current;
if (!converter.TryWrite(writer, element, options, ref state))
{
return false;
}
}
// we have a pending MoveNextAsync() call;
// wrap inside a regular task so that it can be awaited multiple times;
// mark the current stackframe as pending completion.
Debug.Assert(state.PendingTask is null);
state.PendingTask = moveNextTask.AsTask();
state.Current.AsyncEnumeratorIsPendingCompletion = true;
return false;
}
private sealed class BufferedAsyncEnumerable : IAsyncEnumerable<TElement>
{
public readonly List<TElement> _buffer = new();
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
public async IAsyncEnumerator<TElement> GetAsyncEnumerator(CancellationToken _)
{
foreach (TElement element in _buffer)
{
yield return element;
}
}
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
}
}
}
......@@ -218,7 +218,7 @@ protected static JsonConverter<TElement> GetElementConverter(ref WriteStack stat
return true;
}
internal sealed override bool OnTryWrite(
internal override bool OnTryWrite(
Utf8JsonWriter writer,
TCollection value,
JsonSerializerOptions options,
......
......@@ -36,7 +36,15 @@ public sealed override void Write(Utf8JsonWriter writer, T value, JsonSerializer
WriteStack state = default;
state.Initialize(typeof(T), options, supportContinuation: false);
TryWrite(writer, value, options, ref state);
try
{
TryWrite(writer, value, options, ref state);
}
catch
{
state.DisposePendingDisposablesOnException();
throw;
}
}
public sealed override bool HandleNull => false;
......
......@@ -2,9 +2,11 @@
// The .NET Foundation licenses this file to you under the MIT license.
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Runtime.CompilerServices;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
......@@ -45,7 +47,7 @@ public static partial class JsonSerializer
throw new ArgumentNullException(nameof(utf8Json));
}
return ReadAsync<TValue>(utf8Json, typeof(TValue), options, cancellationToken);
return ReadAllAsync<TValue>(utf8Json, typeof(TValue), options, cancellationToken);
}
/// <summary>
......@@ -83,137 +85,207 @@ public static partial class JsonSerializer
if (returnType == null)
throw new ArgumentNullException(nameof(returnType));
return ReadAsync<object?>(utf8Json, returnType, options, cancellationToken);
return ReadAllAsync<object>(utf8Json, returnType, options, cancellationToken);
}
private static async ValueTask<TValue?> ReadAsync<TValue>(
/// <summary>
/// Wraps the UTF-8 encoded text into an <see cref="IAsyncEnumerable{TValue}" />
/// that can be used to deserialize root-level JSON arrays in a streaming manner.
/// </summary>
/// <returns>An <see cref="IAsyncEnumerable{TValue}" /> representation of the provided JSON array.</returns>
/// <param name="utf8Json">JSON data to parse.</param>
/// <param name="options">Options to control the behavior during reading.</param>
/// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> which may be used to cancel the read operation.</param>
/// <returns>An <typeparamref name="TValue"/> representation of the JSON value.</returns>
/// <exception cref="System.ArgumentNullException">
/// <paramref name="utf8Json"/> is <see langword="null"/>.
/// </exception>
public static IAsyncEnumerable<TValue> DeserializeAsyncEnumerable<[DynamicallyAccessedMembers(MembersAccessedOnRead)] TValue>(
Stream utf8Json,
Type returnType,
JsonSerializerOptions? options,
CancellationToken cancellationToken)
JsonSerializerOptions? options = null,
CancellationToken cancellationToken = default)
{
if (options == null)
if (utf8Json == null)
{
options = JsonSerializerOptions.s_defaultOptions;
throw new ArgumentNullException(nameof(utf8Json));
}
ReadStack state = default;
state.Initialize(returnType, options, supportContinuation: true);
options ??= JsonSerializerOptions.s_defaultOptions;
return CreateAsyncEnumerableDeserializer(utf8Json, options, cancellationToken);
JsonConverter converter = state.Current.JsonPropertyInfo!.ConverterBase;
static async IAsyncEnumerable<TValue> CreateAsyncEnumerableDeserializer(
Stream utf8Json,
JsonSerializerOptions options,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
var bufferState = new ReadAsyncBufferState(options.DefaultBufferSize);
ReadStack readStack = default;
readStack.Initialize(typeof(Queue<TValue>), options, supportContinuation: true);
JsonConverter converter = readStack.Current.JsonPropertyInfo!.ConverterBase;
var jsonReaderState = new JsonReaderState(options.GetReaderOptions());
var readerState = new JsonReaderState(options.GetReaderOptions());
try
{
do
{
bufferState = await ReadFromStreamAsync(utf8Json, bufferState, cancellationToken).ConfigureAwait(false);
ContinueDeserialize<Queue<TValue>>(ref bufferState, ref jsonReaderState, ref readStack, converter, options);
if (readStack.Current.ReturnValue is Queue<TValue> queue)
{
while (queue.Count > 0)
{
yield return queue.Dequeue();
}
}
}
while (!bufferState.IsFinalBlock);
}
finally
{
bufferState.Dispose();
}
}
}
// todo: https://github.com/dotnet/runtime/issues/32355
int utf8BomLength = JsonConstants.Utf8Bom.Length;
byte[] buffer = ArrayPool<byte>.Shared.Rent(Math.Max(options.DefaultBufferSize, utf8BomLength));
int bytesInBuffer = 0;
long totalBytesRead = 0;
int clearMax = 0;
bool isFirstIteration = true;
internal static async ValueTask<TValue?> ReadAllAsync<TValue>(
Stream utf8Json,
Type inputType,
JsonSerializerOptions? options,
CancellationToken cancellationToken)
{
options ??= JsonSerializerOptions.s_defaultOptions;
var asyncState = new ReadAsyncBufferState(options.DefaultBufferSize);
ReadStack readStack = default;
readStack.Initialize(inputType, options, supportContinuation: true);
JsonConverter converter = readStack.Current.JsonPropertyInfo!.ConverterBase;
var jsonReaderState = new JsonReaderState(options.GetReaderOptions());
try
{
while (true)
{
// Read from the stream until either our buffer is filled or we hit EOF.
// Calling ReadCore is relatively expensive, so we minimize the number of times
// we need to call it.
bool isFinalBlock = false;
while (true)
asyncState = await ReadFromStreamAsync(utf8Json, asyncState, cancellationToken).ConfigureAwait(false);
TValue value = ContinueDeserialize<TValue>(ref asyncState, ref jsonReaderState, ref readStack, converter, options);
if (asyncState.IsFinalBlock)
{
int bytesRead = await utf8Json.ReadAsync(
return value!;
}
}
}
finally
{
asyncState.Dispose();
}
}
/// <summary>
/// Read from the stream until either our buffer is filled or we hit EOF.
/// Calling ReadCore is relatively expensive, so we minimize the number of times
/// we need to call it.
/// </summary>
internal static async ValueTask<ReadAsyncBufferState> ReadFromStreamAsync(
Stream utf8Json,
ReadAsyncBufferState bufferState,
CancellationToken cancellationToken)
{
while (true)
{
int bytesRead = await utf8Json.ReadAsync(
#if BUILDING_INBOX_LIBRARY
buffer.AsMemory(bytesInBuffer),
bufferState.Buffer.AsMemory(bufferState.BytesInBuffer),
#else
buffer, bytesInBuffer, buffer.Length - bytesInBuffer,
bufferState.Buffer, bufferState.BytesInBuffer, bufferState.Buffer.Length - bufferState.BytesInBuffer,
#endif
cancellationToken).ConfigureAwait(false);
cancellationToken).ConfigureAwait(false);
if (bytesRead == 0)
{
isFinalBlock = true;
break;
}
if (bytesRead == 0)
{
bufferState.IsFinalBlock = true;
break;
}
totalBytesRead += bytesRead;
bytesInBuffer += bytesRead;
bufferState.BytesInBuffer += bytesRead;
if (bytesInBuffer == buffer.Length)
{
break;
}
}
if (bytesInBuffer > clearMax)
{
clearMax = bytesInBuffer;
}
if (bufferState.BytesInBuffer == bufferState.Buffer.Length)
{
break;
}
}
int start = 0;
if (isFirstIteration)
{
isFirstIteration = false;
return bufferState;
}
// Handle the UTF-8 BOM if present
Debug.Assert(buffer.Length >= JsonConstants.Utf8Bom.Length);
if (buffer.AsSpan().StartsWith(JsonConstants.Utf8Bom))
{
start += utf8BomLength;
bytesInBuffer -= utf8BomLength;
}
}
internal static TValue ContinueDeserialize<TValue>(
ref ReadAsyncBufferState bufferState,
ref JsonReaderState jsonReaderState,
ref ReadStack readStack,
JsonConverter converter,
JsonSerializerOptions options)
{
if (bufferState.BytesInBuffer > bufferState.ClearMax)
{
bufferState.ClearMax = bufferState.BytesInBuffer;
}
// Process the data available
TValue value = ReadCore<TValue>(
ref readerState,
isFinalBlock,
new ReadOnlySpan<byte>(buffer, start, bytesInBuffer),
options,
ref state,
converter);
int start = 0;
if (bufferState.IsFirstIteration)
{
bufferState.IsFirstIteration = false;
Debug.Assert(state.BytesConsumed <= bytesInBuffer);
int bytesConsumed = checked((int)state.BytesConsumed);
// Handle the UTF-8 BOM if present
Debug.Assert(bufferState.Buffer.Length >= JsonConstants.Utf8Bom.Length);
if (bufferState.Buffer.AsSpan().StartsWith(JsonConstants.Utf8Bom))
{
start += JsonConstants.Utf8Bom.Length;
bufferState.BytesInBuffer -= JsonConstants.Utf8Bom.Length;
}
}
bytesInBuffer -= bytesConsumed;
// Process the data available
TValue value = ReadCore<TValue>(
ref jsonReaderState,
bufferState.IsFinalBlock,
new ReadOnlySpan<byte>(bufferState.Buffer, start, bufferState.BytesInBuffer),
options,
ref readStack,
converter);
if (isFinalBlock)
{
// The reader should have thrown if we have remaining bytes.
Debug.Assert(bytesInBuffer == 0);
Debug.Assert(readStack.BytesConsumed <= bufferState.BytesInBuffer);
int bytesConsumed = checked((int)readStack.BytesConsumed);
return value;
}
bufferState.BytesInBuffer -= bytesConsumed;
// Check if we need to shift or expand the buffer because there wasn't enough data to complete deserialization.
if ((uint)bytesInBuffer > ((uint)buffer.Length / 2))
{
// We have less than half the buffer available, double the buffer size.
byte[] dest = ArrayPool<byte>.Shared.Rent((buffer.Length < (int.MaxValue / 2)) ? buffer.Length * 2 : int.MaxValue);
// The reader should have thrown if we have remaining bytes.
Debug.Assert(!bufferState.IsFinalBlock || bufferState.BytesInBuffer == 0);
// Copy the unprocessed data to the new buffer while shifting the processed bytes.
Buffer.BlockCopy(buffer, bytesConsumed + start, dest, 0, bytesInBuffer);
if (!bufferState.IsFinalBlock)
{
// Check if we need to shift or expand the buffer because there wasn't enough data to complete deserialization.
if ((uint)bufferState.BytesInBuffer > ((uint)bufferState.Buffer.Length / 2))
{
// We have less than half the buffer available, double the buffer size.
byte[] oldBuffer = bufferState.Buffer;
int oldClearMax = bufferState.ClearMax;
byte[] newBuffer = ArrayPool<byte>.Shared.Rent((bufferState.Buffer.Length < (int.MaxValue / 2)) ? bufferState.Buffer.Length * 2 : int.MaxValue);
new Span<byte>(buffer, 0, clearMax).Clear();
ArrayPool<byte>.Shared.Return(buffer);
// Copy the unprocessed data to the new buffer while shifting the processed bytes.
Buffer.BlockCopy(oldBuffer, bytesConsumed + start, newBuffer, 0, bufferState.BytesInBuffer);
bufferState.Buffer = newBuffer;
bufferState.ClearMax = bufferState.BytesInBuffer;
clearMax = bytesInBuffer;
buffer = dest;
}
else if (bytesInBuffer != 0)
{
// Shift the processed bytes to the beginning of buffer to make more room.
Buffer.BlockCopy(buffer, bytesConsumed + start, buffer, 0, bytesInBuffer);
}
// Clear and return the old buffer
new Span<byte>(oldBuffer, 0, oldClearMax).Clear();
ArrayPool<byte>.Shared.Return(oldBuffer);
}
else if (bufferState.BytesInBuffer != 0)
{
// Shift the processed bytes to the beginning of buffer to make more room.
Buffer.BlockCopy(bufferState.Buffer, bytesConsumed + start, bufferState.Buffer, 0, bufferState.BytesInBuffer);
}
}
finally
{
// Clear only what we used and return the buffer to the pool
new Span<byte>(buffer, 0, clearMax).Clear();
ArrayPool<byte>.Shared.Return(buffer);
}
return value;
}
private static TValue ReadCore<TValue>(
......@@ -234,7 +306,6 @@ public static partial class JsonSerializer
state.BytesConsumed = 0;
TValue? value = ReadCore<TValue>(converterBase, ref reader, options, ref state);
readerState = reader.CurrentState;
return value!;
}
......
......@@ -29,8 +29,16 @@ public static partial class JsonSerializer
WriteStack state = default;
JsonConverter jsonConverter = state.Initialize(inputType, options, supportContinuation: false);
bool success = WriteCore(jsonConverter, writer, value, options, ref state);
Debug.Assert(success);
try
{
bool success = WriteCore(jsonConverter, writer, value, options, ref state);
Debug.Assert(success);
}
catch
{
state.DisposePendingDisposablesOnException();
throw;
}
}
private static bool WriteCore<TValue>(
......
......@@ -113,21 +113,52 @@ public static partial class JsonSerializer
inputType = value!.GetType();
}
WriteStack state = default;
WriteStack state = new WriteStack { CancellationToken = cancellationToken };
JsonConverter converterBase = state.Initialize(inputType, options, supportContinuation: true);
bool isFinalBlock;
do
try
{
state.FlushThreshold = (int)(bufferWriter.Capacity * FlushThreshold);
isFinalBlock = WriteCore(converterBase, writer, value, options, ref state);
await bufferWriter.WriteToStreamAsync(utf8Json, cancellationToken).ConfigureAwait(false);
bufferWriter.Clear();
} while (!isFinalBlock);
do
{
state.FlushThreshold = (int)(bufferWriter.Capacity * FlushThreshold);
try
{
isFinalBlock = WriteCore(converterBase, writer, value, options, ref state);
}
finally
{
if (state.PendingAsyncDisposables?.Count > 0)
{
await state.DisposePendingAsyncDisposables().ConfigureAwait(false);
}
}
await bufferWriter.WriteToStreamAsync(utf8Json, cancellationToken).ConfigureAwait(false);
bufferWriter.Clear();
if (state.PendingTask is not null)
{
try
{
await state.PendingTask.ConfigureAwait(false);
}
catch
{
// Exceptions will be propagated elsewhere
// TODO https://github.com/dotnet/runtime/issues/22144
}
}
} while (!isFinalBlock);
}
catch
{
await state.DisposePendingDisposablesOnExceptionAsync().ConfigureAwait(false);
throw;
}
}
}
}
......
......@@ -26,6 +26,8 @@ public sealed partial class JsonSerializerOptions
// Nullable converter should always be next since it forwards to any nullable type.
new NullableConverterFactory(),
new EnumConverterFactory(),
// IAsyncEnumerable takes precedence over IEnumerable.
new IAsyncEnumerableConverterFactory(),
// IEnumerable should always be second to last since they can convert any IEnumerable.
new IEnumerableConverterFactory(),
// Object should always be last since it converts any type.
......
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Buffers;
using System.Diagnostics;
using System.Threading;
namespace System.Text.Json.Serialization
{
internal struct ReadAsyncBufferState : IDisposable
{
public byte[] Buffer;
public int BytesInBuffer;
public int ClearMax;
public bool IsFirstIteration;
public bool IsFinalBlock;
public ReadAsyncBufferState(int defaultBufferSize)
{
Buffer = ArrayPool<byte>.Shared.Rent(Math.Max(defaultBufferSize, JsonConstants.Utf8Bom.Length));
BytesInBuffer = ClearMax = 0;
IsFirstIteration = true;
IsFinalBlock = false;
}
public void Dispose()
{
// Clear only what we used and return the buffer to the pool
new Span<byte>(Buffer, 0, ClearMax).Clear();
ArrayPool<byte>.Shared.Return(Buffer);
Buffer = null!;
}
}
}
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.ExceptionServices;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
namespace System.Text.Json
{
......@@ -20,6 +24,22 @@ internal struct WriteStack
/// </summary>
private int _count;
/// <summary>
/// Cancellation token used by converters performing async serialization (e.g. IAsyncEnumerable)
/// </summary>
public CancellationToken CancellationToken;
/// <summary>
/// Stores a pending task that a resumable converter depends on to continue work.
/// It must be awaited by the root context before serialization is resumed.
/// </summary>
public Task? PendingTask;
/// <summary>
/// List of IAsyncDisposables that have been scheduled for disposal by converters.
/// </summary>
public List<IAsyncDisposable>? PendingAsyncDisposables;
private List<WriteStackFrame> _previous;
// A field is used instead of a property to avoid value semantics.
......@@ -172,6 +192,14 @@ public void Pop(bool success)
else
{
Debug.Assert(_continuationCount == 0);
if (Current.AsyncEnumerator is not null)
{
// we have completed serialization of an AsyncEnumerator,
// pop from the stack and schedule for async disposal.
PendingAsyncDisposables ??= new List<IAsyncDisposable>();
PendingAsyncDisposables.Add(Current.AsyncEnumerator);
}
}
if (_count > 1)
......@@ -180,6 +208,122 @@ public void Pop(bool success)
}
}
// Asynchronously dispose of any AsyncDisposables that have been scheduled for disposal
public async ValueTask DisposePendingAsyncDisposables()
{
Debug.Assert(PendingAsyncDisposables?.Count > 0);
Exception? exception = null;
foreach (IAsyncDisposable asyncDisposable in PendingAsyncDisposables)
{
try
{
await asyncDisposable.DisposeAsync().ConfigureAwait(false);
}
catch (Exception e)
{
exception = e;
}
}
if (exception is not null)
{
ExceptionDispatchInfo.Capture(exception).Throw();
}
PendingAsyncDisposables.Clear();
}
/// <summary>
/// Walks the stack cleaning up any leftover IDisposables
/// in the event of an exception on serialization
/// </summary>
public void DisposePendingDisposablesOnException()
{
Exception? exception = null;
Debug.Assert(Current.AsyncEnumerator is null);
DisposeFrame(Current.CollectionEnumerator, ref exception);
int stackSize = Math.Max(_count, _continuationCount);
if (stackSize > 1)
{
for (int i = 0; i < stackSize - 1; i++)
{
Debug.Assert(_previous[i].AsyncEnumerator is null);
DisposeFrame(_previous[i].CollectionEnumerator, ref exception);
}
}
if (exception is not null)
{
ExceptionDispatchInfo.Capture(exception).Throw();
}
static void DisposeFrame(IEnumerator? collectionEnumerator, ref Exception? exception)
{
try
{
if (collectionEnumerator is IDisposable disposable)
{
disposable.Dispose();
}
}
catch (Exception e)
{
exception = e;
}
}
}
/// <summary>
/// Walks the stack cleaning up any leftover I(Async)Disposables
/// in the event of an exception on async serialization
/// </summary>
public async ValueTask DisposePendingDisposablesOnExceptionAsync()
{
Exception? exception = null;
exception = await DisposeFrame(Current.CollectionEnumerator, Current.AsyncEnumerator, exception).ConfigureAwait(false);
int stackSize = Math.Max(_count, _continuationCount);
if (stackSize > 1)
{
for (int i = 0; i < stackSize - 1; i++)
{
exception = await DisposeFrame(_previous[i].CollectionEnumerator, _previous[i].AsyncEnumerator, exception).ConfigureAwait(false);
}
}
if (exception is not null)
{
ExceptionDispatchInfo.Capture(exception).Throw();
}
static async ValueTask<Exception?> DisposeFrame(IEnumerator? collectionEnumerator, IAsyncDisposable? asyncDisposable, Exception? exception)
{
Debug.Assert(!(collectionEnumerator is not null && asyncDisposable is not null));
try
{
if (collectionEnumerator is IDisposable disposable)
{
disposable.Dispose();
}
else if (asyncDisposable is not null)
{
await asyncDisposable.DisposeAsync().ConfigureAwait(false);
}
}
catch (Exception e)
{
exception = e;
}
return exception;
}
}
// Return a property path as a simple JSONPath using dot-notation when possible. When special characters are present, bracket-notation is used:
// $.x.y.z
// $['PropertyName.With.Special.Chars']
......
......@@ -3,6 +3,7 @@
using System.Collections;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Text.Json.Serialization;
namespace System.Text.Json
......@@ -15,6 +16,17 @@ internal struct WriteStackFrame
/// </summary>
public IEnumerator? CollectionEnumerator;
/// <summary>
/// The enumerator for resumable async disposables.
/// </summary>
public IAsyncDisposable? AsyncEnumerator;
/// <summary>
/// The current stackframe has suspended serialization due to a pending task,
/// stored in the <see cref="WriteStack.PendingTask"/> property.
/// </summary>
public bool AsyncEnumeratorIsPendingCompletion;
/// <summary>
/// The original JsonPropertyInfo that is not changed. It contains all properties.
/// </summary>
......@@ -113,6 +125,8 @@ public void Reset()
{
CollectionEnumerator = null;
EnumeratorIndex = 0;
AsyncEnumerator = null;
AsyncEnumeratorIsPendingCompletion = false;
IgnoreDictionaryKeyPolicy = false;
JsonClassInfo = null!;
OriginalDepth = 0;
......
......@@ -26,6 +26,13 @@ public static void ThrowNotSupportedException_SerializationNotSupported(Type pro
throw new NotSupportedException(SR.Format(SR.SerializationNotSupportedType, propertyType));
}
[DoesNotReturn]
[MethodImpl(MethodImplOptions.NoInlining)]
public static void ThrowNotSupportedException_TypeRequiresAsyncSerialization(Type propertyType)
{
throw new NotSupportedException(SR.Format(SR.TypeRequiresAsyncSerialization, propertyType));
}
[DoesNotReturn]
[MethodImpl(MethodImplOptions.NoInlining)]
public static void ThrowNotSupportedException_ConstructorMaxOf64Parameters(ConstructorInfo constructorInfo, Type type)
......
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace System.Text.Json.Tests.Serialization
{
public static partial class CollectionTests
{
[Theory]
[MemberData(nameof(GetAsyncEnumerableSources))]
public static async Task WriteRootLevelAsyncEnumerable<TElement>(IEnumerable<TElement> source, int delayInterval, int bufferSize)
{
JsonSerializerOptions options = new JsonSerializerOptions
{
DefaultBufferSize = bufferSize
};
string expectedJson = JsonSerializer.Serialize(source);
using var stream = new Utf8MemoryStream();
var asyncEnumerable = new MockedAsyncEnumerable<TElement>(source, delayInterval);
await JsonSerializer.SerializeAsync(stream, asyncEnumerable, options);
JsonTestHelper.AssertJsonEqual(expectedJson, stream.ToString());
Assert.Equal(1, asyncEnumerable.TotalCreatedEnumerators);
Assert.Equal(1, asyncEnumerable.TotalDisposedEnumerators);
}
[Theory]
[MemberData(nameof(GetAsyncEnumerableSources))]
public static async Task WriteNestedAsyncEnumerable<TElement>(IEnumerable<TElement> source, int delayInterval, int bufferSize)
{
JsonSerializerOptions options = new JsonSerializerOptions
{
DefaultBufferSize = bufferSize
};
string expectedJson = JsonSerializer.Serialize(new { Data = source });
using var stream = new Utf8MemoryStream();
var asyncEnumerable = new MockedAsyncEnumerable<TElement>(source, delayInterval);
await JsonSerializer.SerializeAsync(stream, new { Data = asyncEnumerable }, options);
JsonTestHelper.AssertJsonEqual(expectedJson, stream.ToString());
Assert.Equal(1, asyncEnumerable.TotalCreatedEnumerators);
Assert.Equal(1, asyncEnumerable.TotalDisposedEnumerators);
}
[Theory]
[MemberData(nameof(GetAsyncEnumerableSources))]
public static async Task WriteNestedAsyncEnumerable_DTO<TElement>(IEnumerable<TElement> source, int delayInterval, int bufferSize)
{
JsonSerializerOptions options = new JsonSerializerOptions
{
DefaultBufferSize = bufferSize
};
string expectedJson = JsonSerializer.Serialize(new { Data = source });
using var stream = new Utf8MemoryStream();
var asyncEnumerable = new MockedAsyncEnumerable<TElement>(source, delayInterval);
await JsonSerializer.SerializeAsync(stream, new AsyncEnumerableDto<TElement> { Data = asyncEnumerable }, options);
JsonTestHelper.AssertJsonEqual(expectedJson, stream.ToString());
Assert.Equal(1, asyncEnumerable.TotalCreatedEnumerators);
Assert.Equal(1, asyncEnumerable.TotalDisposedEnumerators);
}
[Fact, OuterLoop]
public static async Task WriteAsyncEnumerable_LongRunningEnumeration_Cancellation()
{
var longRunningEnumerable = new MockedAsyncEnumerable<int>(
source: Enumerable.Range(1, 100),
delayInterval: 1,
delay: TimeSpan.FromMinutes(1));
using var utf8Stream = new Utf8MemoryStream();
using var cts = new CancellationTokenSource(delay: TimeSpan.FromSeconds(5));
await Assert.ThrowsAsync<TaskCanceledException>(async () =>
await JsonSerializer.SerializeAsync(utf8Stream, longRunningEnumerable, cancellationToken: cts.Token));
Assert.Equal(1, longRunningEnumerable.TotalCreatedEnumerators);
Assert.Equal(1, longRunningEnumerable.TotalDisposedEnumerators);
}
public class AsyncEnumerableDto<TElement>
{
public IAsyncEnumerable<TElement> Data { get; set; }
}
[Theory]
[MemberData(nameof(GetAsyncEnumerableSources))]
public static async Task WriteSequentialNestedAsyncEnumerables<TElement>(IEnumerable<TElement> source, int delayInterval, int bufferSize)
{
JsonSerializerOptions options = new JsonSerializerOptions
{
DefaultBufferSize = bufferSize
};
string expectedJson = JsonSerializer.Serialize(new { Data1 = source, Data2 = source });
using var stream = new Utf8MemoryStream();
var asyncEnumerable = new MockedAsyncEnumerable<TElement>(source, delayInterval);
await JsonSerializer.SerializeAsync(stream, new { Data1 = asyncEnumerable, Data2 = asyncEnumerable }, options);
JsonTestHelper.AssertJsonEqual(expectedJson, stream.ToString());
Assert.Equal(2, asyncEnumerable.TotalCreatedEnumerators);
Assert.Equal(2, asyncEnumerable.TotalDisposedEnumerators);
}
[Theory]
[MemberData(nameof(GetAsyncEnumerableSources))]
public static async Task WriteAsyncEnumerableOfAsyncEnumerables<TElement>(IEnumerable<TElement> source, int delayInterval, int bufferSize)
{
JsonSerializerOptions options = new JsonSerializerOptions
{
DefaultBufferSize = bufferSize
};
const int OuterEnumerableCount = 5;
string expectedJson = JsonSerializer.Serialize(Enumerable.Repeat(source, OuterEnumerableCount));
var innerAsyncEnumerable = new MockedAsyncEnumerable<TElement>(source, delayInterval);
var outerAsyncEnumerable =
new MockedAsyncEnumerable<IAsyncEnumerable<TElement>>(
Enumerable.Repeat(innerAsyncEnumerable, OuterEnumerableCount), delayInterval);
using var stream = new Utf8MemoryStream();
await JsonSerializer.SerializeAsync(stream, outerAsyncEnumerable, options);
JsonTestHelper.AssertJsonEqual(expectedJson, stream.ToString());
Assert.Equal(1, outerAsyncEnumerable.TotalCreatedEnumerators);
Assert.Equal(1, outerAsyncEnumerable.TotalDisposedEnumerators);
Assert.Equal(OuterEnumerableCount, innerAsyncEnumerable.TotalCreatedEnumerators);
Assert.Equal(OuterEnumerableCount, innerAsyncEnumerable.TotalDisposedEnumerators);
}
[Fact]
public static void WriteRootLevelAsyncEnumerableSync_ThrowsNotSupportedException()
{
IAsyncEnumerable<int> asyncEnumerable = new MockedAsyncEnumerable<int>(Enumerable.Range(1, 10));
Assert.Throws<NotSupportedException>(() => JsonSerializer.Serialize(asyncEnumerable));
}
[Fact]
public static void WriteNestedAsyncEnumerableSync_ThrowsNotSupportedException()
{
IAsyncEnumerable<int> asyncEnumerable = new MockedAsyncEnumerable<int>(Enumerable.Range(1, 10));
Assert.Throws<NotSupportedException>(() => JsonSerializer.Serialize(new { Data = asyncEnumerable }));
}
[Fact]
public static async Task WriteAsyncEnumerable_ElementSerializationThrows_ShouldDisposeEnumerator()
{
using var stream = new Utf8MemoryStream();
var asyncEnumerable = new MockedAsyncEnumerable<IEnumerable<int>>(Enumerable.Repeat(ThrowingEnumerable(), 2));
await Assert.ThrowsAsync<DivideByZeroException>(() => JsonSerializer.SerializeAsync(stream, new { Data = asyncEnumerable }));
Assert.Equal(1, asyncEnumerable.TotalCreatedEnumerators);
Assert.Equal(1, asyncEnumerable.TotalDisposedEnumerators);
static IEnumerable<int> ThrowingEnumerable()
{
yield return 0;
throw new DivideByZeroException();
}
}
[Fact]
public static async Task ReadRootLevelAsyncEnumerable()
{
var utf8Stream = new Utf8MemoryStream("[0,1,2,3,4]");
IAsyncEnumerable<int> result = await JsonSerializer.DeserializeAsync<IAsyncEnumerable<int>>(utf8Stream);
Assert.Equal(new int[] { 0, 1, 2, 3, 4 }, await result.ToListAsync());
}
[Fact]
public static async Task ReadNestedAsyncEnumerable()
{
var utf8Stream = new Utf8MemoryStream(@"{ ""Data"" : [0,1,2,3,4] }");
var result = await JsonSerializer.DeserializeAsync<AsyncEnumerableDto<int>>(utf8Stream);
Assert.Equal(new int[] { 0, 1, 2, 3, 4 }, await result.Data.ToListAsync());
}
[Fact]
public static async Task ReadAsyncEnumerableOfAsyncEnumerables()
{
var utf8Stream = new Utf8MemoryStream("[[0,1,2,3,4], []]");
var result = await JsonSerializer.DeserializeAsync<IAsyncEnumerable<IAsyncEnumerable<int>>>(utf8Stream);
var resultArray = await result.ToListAsync();
Assert.Equal(2, resultArray.Count);
Assert.Equal(new int[] { 0, 1, 2, 3, 4 }, await resultArray[0].ToListAsync());
Assert.Equal(Array.Empty<int>(), await resultArray[1].ToListAsync());
}
[Fact]
public static async Task ReadRootLevelAsyncEnumerableDerivative_ThrowsNotSupportedException()
{
var utf8Stream = new Utf8MemoryStream("[0,1,2,3,4]");
await Assert.ThrowsAsync<NotSupportedException>(async () => await JsonSerializer.DeserializeAsync<MockedAsyncEnumerable<int>>(utf8Stream));
}
public static IEnumerable<object[]> GetAsyncEnumerableSources()
{
yield return WrapArgs(Enumerable.Empty<int>(), 0, 1);
yield return WrapArgs(Enumerable.Range(0, 20), 0, 1);
yield return WrapArgs(Enumerable.Range(0, 100), 20, 20);
yield return WrapArgs(Enumerable.Range(0, 1000), 20, 20);
yield return WrapArgs(Enumerable.Range(0, 100).Select(i => $"lorem ipsum dolor: {i}"), 20, 100);
yield return WrapArgs(Enumerable.Range(0, 10).Select(i => new { Field1 = i, Field2 = $"lorem ipsum dolor: {i}", Field3 = i % 2 == 0 }), 3, 100);
yield return WrapArgs(Enumerable.Range(0, 100).Select(i => new { Field1 = i, Field2 = $"lorem ipsum dolor: {i}", Field3 = i % 2 == 0 }), 20, 100);
static object[] WrapArgs<TSource>(IEnumerable<TSource> source, int delayInterval, int bufferSize) => new object[]{ source, delayInterval, bufferSize };
}
private static async Task<List<T>> ToListAsync<T>(this IAsyncEnumerable<T> source)
{
var list = new List<T>();
await foreach (T item in source)
{
list.Add(item);
}
return list;
}
private class MockedAsyncEnumerable<TElement> : IAsyncEnumerable<TElement>, IEnumerable<TElement>
{
private readonly IEnumerable<TElement> _source;
private readonly TimeSpan _delay;
private readonly int _delayInterval;
internal int TotalCreatedEnumerators { get; private set; }
internal int TotalDisposedEnumerators { get; private set; }
internal int TotalEnumeratedElements { get; private set; }
public MockedAsyncEnumerable(IEnumerable<TElement> source, int delayInterval = 0, TimeSpan? delay = null)
{
_source = source;
_delay = delay ?? TimeSpan.FromMilliseconds(20);
_delayInterval = delayInterval;
}
public IAsyncEnumerator<TElement> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return new MockedAsyncEnumerator(this, cancellationToken);
}
// Enumerator class required to instrument IAsyncDisposable calls
private class MockedAsyncEnumerator : IAsyncEnumerator<TElement>
{
private readonly MockedAsyncEnumerable<TElement> _enumerable;
private IAsyncEnumerator<TElement> _innerEnumerator;
public MockedAsyncEnumerator(MockedAsyncEnumerable<TElement> enumerable, CancellationToken token)
{
_enumerable = enumerable;
_innerEnumerator = enumerable.GetAsyncEnumeratorInner(token);
}
public TElement Current => _innerEnumerator.Current;
public ValueTask DisposeAsync()
{
_enumerable.TotalDisposedEnumerators++;
return _innerEnumerator.DisposeAsync();
}
public ValueTask<bool> MoveNextAsync() => _innerEnumerator.MoveNextAsync();
}
private async IAsyncEnumerator<TElement> GetAsyncEnumeratorInner(CancellationToken cancellationToken = default)
{
TotalCreatedEnumerators++;
int i = 0;
foreach (TElement element in _source)
{
if (i > 0 && _delayInterval > 0 && i % _delayInterval == 0)
{
await Task.Delay(_delay, cancellationToken);
}
if (cancellationToken.IsCancellationRequested)
{
yield break;
}
TotalEnumeratedElements++;
yield return element;
i++;
}
}
public IEnumerator<TElement> GetEnumerator() => throw new InvalidOperationException("Collection should not be enumerated synchronously.");
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
private class Utf8MemoryStream : MemoryStream
{
public Utf8MemoryStream() : base()
{
}
public Utf8MemoryStream(string text) : base(Encoding.UTF8.GetBytes(text))
{
}
public override string ToString() => Encoding.UTF8.GetString(ToArray());
}
}
}
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace System.Text.Json.Serialization.Tests
{
public static partial class StreamTests_DeserializeAsyncEnumerable
{
[Theory]
[InlineData(0, 1)]
[InlineData(1, 1)]
[InlineData(10, 1)]
[InlineData(100, 1)]
[InlineData(1000, 1)]
[InlineData(1000, 1000)]
[InlineData(1000, 32000)]
public static async Task DeserializeAsyncEnumerable_ReadSimpleObjectAsync(int count, int bufferSize)
{
JsonSerializerOptions options = new JsonSerializerOptions
{
DefaultBufferSize = bufferSize
};
using var stream = new MemoryStream(GenerateJsonArray(count));
int callbackCount = 0;
await foreach(SimpleTestClass item in JsonSerializer.DeserializeAsyncEnumerable<SimpleTestClass>(stream, options))
{
Assert.Equal(callbackCount, item.MyInt32);
item.MyInt32 = 2; // Put correct value back for Verify()
item.Verify();
callbackCount++;
}
Assert.Equal(count, callbackCount);
static byte[] GenerateJsonArray(int count)
{
SimpleTestClass[] collection = new SimpleTestClass[count];
for (int i = 0; i < collection.Length; i++)
{
var obj = new SimpleTestClass();
obj.Initialize();
obj.MyInt32 = i; // verify order correctness
collection[i] = obj;
}
return JsonSerializer.SerializeToUtf8Bytes(collection);
}
}
[Theory]
[MemberData(nameof(GetAsyncEnumerableSources))]
public static async Task DeserializeAsyncEnumerable_ReadSourceAsync<TElement>(IEnumerable<TElement> source, int bufferSize)
{
JsonSerializerOptions options = new JsonSerializerOptions
{
DefaultBufferSize = bufferSize
};
byte[] data = JsonSerializer.SerializeToUtf8Bytes(source);
using var stream = new MemoryStream(data);
List<TElement> results = await JsonSerializer.DeserializeAsyncEnumerable<TElement>(stream, options).ToListAsync();
Assert.Equal(source, results);
}
[Fact]
public static void DeserializeAsyncEnumerable_NullStream_ThrowsArgumentNullException()
{
AssertExtensions.Throws<ArgumentNullException>("utf8Json", () => JsonSerializer.DeserializeAsyncEnumerable<int>(utf8Json: null));
}
[Fact]
public static async Task DeserializeAsyncEnumerable_CancellationToken_ThrowsOnCancellation()
{
JsonSerializerOptions options = new JsonSerializerOptions
{
DefaultBufferSize = 1
};
byte[] data = JsonSerializer.SerializeToUtf8Bytes(Enumerable.Range(1, 100));
var token = new CancellationToken(canceled: true);
using var stream = new MemoryStream(data);
var cancellableAsyncEnumerable = JsonSerializer.DeserializeAsyncEnumerable<int>(stream, options, token);
await Assert.ThrowsAsync<TaskCanceledException>(async () =>
{
await foreach (int element in cancellableAsyncEnumerable)
{
}
});
}
[Fact]
public static async Task DeserializeAsyncEnumerable_EnumeratorWithCancellationToken_ThrowsOnCancellation()
{
JsonSerializerOptions options = new JsonSerializerOptions
{
DefaultBufferSize = 1
};
byte[] data = JsonSerializer.SerializeToUtf8Bytes(Enumerable.Range(1, 100));
var token = new CancellationToken(canceled: true);
using var stream = new MemoryStream(data);
var cancellableAsyncEnumerable = JsonSerializer.DeserializeAsyncEnumerable<int>(stream, options).WithCancellation(token);
await Assert.ThrowsAsync<TaskCanceledException>(async () =>
{
await foreach (int element in cancellableAsyncEnumerable)
{
}
});
}
public static IEnumerable<object[]> GetAsyncEnumerableSources()
{
yield return WrapArgs(Enumerable.Empty<int>(), 1);
yield return WrapArgs(Enumerable.Range(0, 20), 1);
yield return WrapArgs(Enumerable.Range(0, 100), 20);
yield return WrapArgs(Enumerable.Range(0, 100).Select(i => $"lorem ipsum dolor: {i}"), 500);
yield return WrapArgs(Enumerable.Range(0, 10).Select(i => new { Field1 = i, Field2 = $"lorem ipsum dolor: {i}", Field3 = i % 2 == 0 }), 100);
yield return WrapArgs(Enumerable.Range(0, 100).Select(i => new { Field1 = i, Field2 = $"lorem ipsum dolor: {i}", Field3 = i % 2 == 0 }), 500);
static object[] WrapArgs<TSource>(IEnumerable<TSource> source, int bufferSize) => new object[] { source, bufferSize };
}
private static async Task<List<T>> ToListAsync<T>(this IAsyncEnumerable<T> source)
{
var list = new List<T>();
await foreach (T item in source)
{
list.Add(item);
}
return list;
}
}
}
......@@ -40,6 +40,7 @@
<Compile Include="Serialization\Array.WriteTests.cs" />
<Compile Include="Serialization\CacheTests.cs" />
<Compile Include="Serialization\CamelCaseUnitTests.cs" />
<Compile Include="Serialization\CollectionTests\CollectionTests.AsyncEnumerable.cs" />
<Compile Include="Serialization\CollectionTests\CollectionTests.Concurrent.Write.cs" />
<Compile Include="Serialization\CollectionTests\CollectionTests.Concurrent.cs" />
<Compile Include="Serialization\CollectionTests\CollectionTests.Dictionary.cs" />
......@@ -123,6 +124,7 @@
<Compile Include="Serialization\SerializationWrapper.cs" />
<Compile Include="Serialization\SpanTests.cs" />
<Compile Include="Serialization\Stream.Collections.cs" />
<Compile Include="Serialization\Stream.DeserializeAsyncEnumerable.cs" />
<Compile Include="Serialization\Stream.ReadTests.cs" />
<Compile Include="Serialization\Stream.WriteTests.cs" />
<Compile Include="Serialization\TestClasses\TestClasses.ConcurrentCollections.cs" />
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册