From 81d3a993ca115ef7cf808d9f88f1ef6adad755f6 Mon Sep 17 00:00:00 2001 From: Eirik Tsarpalis Date: Mon, 12 Apr 2021 17:06:01 +0100 Subject: [PATCH] System.Text.Json: Add IAsyncEnumerable support (#50778) * implement IAsyncEnumerable JsonConverter * Prototype of IAsyncEnumerable deserialize with Stream * Use a Queue + test buffersizes * Avoid 1 item lag * Add support for Serialize * Misc cleanup on test * extend DeserializeAsyncEnumerable test coverage also removes SerializeAsyncEnumerable components * Update src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IAsyncEnumerableConverterFactory.cs Co-authored-by: Stephen Toub * address feedback * tweak test buffer values * Update src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IAsyncEnumerableOfTConverter.cs Co-authored-by: Stephen Toub * Update src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IAsyncEnumerableOfTConverter.cs Co-authored-by: Stephen Toub * address feedback * increase delayInterval in serialization tests * address feedback * address feedback * add test on exceptional IAsyncDisposable disposal * address feedback * Update src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ReadAsyncState.cs Co-authored-by: Layomi Akinrinade * Update src/libraries/System.Text.Json/src/System/Text/Json/Serialization/WriteStack.cs Co-authored-by: Layomi Akinrinade * fix build and remove dead code * address feedback * Revert unneeded JsonClassInfo.ElementType workaround * remove state allocation on async deserialization methods * remove tooling artifacts * address feedback * reset AsyncEnumeratorIsPendingCompletion field Co-authored-by: Steve Harter Co-authored-by: Stephen Toub Co-authored-by: Layomi Akinrinade --- .../System.Text.Json/ref/System.Text.Json.cs | 1 + .../src/Resources/Strings.resx | 5 +- .../src/System.Text.Json.csproj | 9 +- .../Text/Json/Serialization/ClassType.cs | 2 +- .../IAsyncEnumerableConverterFactory.cs | 31 ++ .../IAsyncEnumerableOfTConverter.cs | 126 +++++++ .../Collection/IEnumerableDefaultConverter.cs | 2 +- .../JsonResumableConverterOfT.cs | 10 +- .../JsonSerializer.Read.Stream.cs | 267 +++++++++------ .../JsonSerializer.Write.Helpers.cs | 12 +- .../JsonSerializer.Write.Stream.cs | 51 ++- .../JsonSerializerOptions.Converters.cs | 2 + .../Serialization/ReadAsyncBufferState.cs | 34 ++ .../Text/Json/Serialization/WriteStack.cs | 144 ++++++++ .../Json/Serialization/WriteStackFrame.cs | 14 + .../Text/Json/ThrowHelper.Serialization.cs | 7 + .../CollectionTests.AsyncEnumerable.cs | 320 ++++++++++++++++++ .../Stream.DeserializeAsyncEnumerable.cs | 148 ++++++++ .../tests/System.Text.Json.Tests.csproj | 2 + 19 files changed, 1069 insertions(+), 118 deletions(-) create mode 100644 src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IAsyncEnumerableConverterFactory.cs create mode 100644 src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IAsyncEnumerableOfTConverter.cs create mode 100644 src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ReadAsyncBufferState.cs create mode 100644 src/libraries/System.Text.Json/tests/Serialization/CollectionTests/CollectionTests.AsyncEnumerable.cs create mode 100644 src/libraries/System.Text.Json/tests/Serialization/Stream.DeserializeAsyncEnumerable.cs diff --git a/src/libraries/System.Text.Json/ref/System.Text.Json.cs b/src/libraries/System.Text.Json/ref/System.Text.Json.cs index cf0f709ad2d..48a4c8c5f9e 100644 --- a/src/libraries/System.Text.Json/ref/System.Text.Json.cs +++ b/src/libraries/System.Text.Json/ref/System.Text.Json.cs @@ -192,6 +192,7 @@ public static partial class JsonSerializer public static object? Deserialize(ref System.Text.Json.Utf8JsonReader reader, [System.Diagnostics.CodeAnalysis.DynamicallyAccessedMembersAttribute(System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicConstructors | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicFields | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicProperties)] System.Type returnType, System.Text.Json.JsonSerializerOptions? options = null) { throw null; } public static System.Threading.Tasks.ValueTask DeserializeAsync(System.IO.Stream utf8Json, [System.Diagnostics.CodeAnalysis.DynamicallyAccessedMembersAttribute(System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicConstructors | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicFields | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicProperties)] System.Type returnType, System.Text.Json.JsonSerializerOptions? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public static System.Threading.Tasks.ValueTask DeserializeAsync<[System.Diagnostics.CodeAnalysis.DynamicallyAccessedMembersAttribute(System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicConstructors | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicFields | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicProperties)] TValue>(System.IO.Stream utf8Json, System.Text.Json.JsonSerializerOptions? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public static System.Collections.Generic.IAsyncEnumerable DeserializeAsyncEnumerable<[System.Diagnostics.CodeAnalysis.DynamicallyAccessedMembersAttribute(System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicConstructors | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicFields | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicProperties)] TValue>(System.IO.Stream utf8Json, System.Text.Json.JsonSerializerOptions? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public static TValue? Deserialize<[System.Diagnostics.CodeAnalysis.DynamicallyAccessedMembersAttribute(System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicConstructors | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicFields | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicProperties)] TValue>(System.ReadOnlySpan utf8Json, System.Text.Json.JsonSerializerOptions? options = null) { throw null; } public static TValue? Deserialize<[System.Diagnostics.CodeAnalysis.DynamicallyAccessedMembersAttribute(System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicConstructors | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicFields | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicProperties)] TValue>(string json, System.Text.Json.JsonSerializerOptions? options = null) { throw null; } public static TValue? Deserialize<[System.Diagnostics.CodeAnalysis.DynamicallyAccessedMembersAttribute(System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicConstructors | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicFields | System.Diagnostics.CodeAnalysis.DynamicallyAccessedMemberTypes.PublicProperties)] TValue>(System.ReadOnlySpan json, System.Text.Json.JsonSerializerOptions? options = null) { throw null; } diff --git a/src/libraries/System.Text.Json/src/Resources/Strings.resx b/src/libraries/System.Text.Json/src/Resources/Strings.resx index c6039fdeba1..47c651107df 100644 --- a/src/libraries/System.Text.Json/src/Resources/Strings.resx +++ b/src/libraries/System.Text.Json/src/Resources/Strings.resx @@ -357,6 +357,9 @@ The type '{0}' is not supported. + + The type '{0}' can only be serialized using async serialization methods. + '{0}' is invalid after '/' at the beginning of the comment. Expected either '/' or '*'. @@ -557,4 +560,4 @@ The converter '{0}' cannot return an instance of JsonConverterFactory. - \ No newline at end of file + diff --git a/src/libraries/System.Text.Json/src/System.Text.Json.csproj b/src/libraries/System.Text.Json/src/System.Text.Json.csproj index cb91cbdfdb4..c84780fae36 100644 --- a/src/libraries/System.Text.Json/src/System.Text.Json.csproj +++ b/src/libraries/System.Text.Json/src/System.Text.Json.csproj @@ -73,6 +73,8 @@ + + @@ -173,6 +175,7 @@ + @@ -233,11 +236,9 @@ - + - + diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ClassType.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ClassType.cs index 7dd4fe798d1..aedc4343192 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ClassType.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ClassType.cs @@ -20,7 +20,7 @@ internal enum ClassType : byte Value = 0x2, // JsonValueConverter<> - simple values that need to re-enter the serializer such as KeyValuePair. NewValue = 0x4, - // JsonIEnumerbleConverter<> - all enumerable collections except dictionaries. + // JsonIEnumerableConverter<> - all enumerable collections except dictionaries. Enumerable = 0x8, // JsonDictionaryConverter<,> - dictionary types. Dictionary = 0x10, diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IAsyncEnumerableConverterFactory.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IAsyncEnumerableConverterFactory.cs new file mode 100644 index 00000000000..d9ad1615bb4 --- /dev/null +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IAsyncEnumerableConverterFactory.cs @@ -0,0 +1,31 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Text.Json.Serialization.Converters; + +namespace System.Text.Json.Serialization +{ + /// + /// Converter for streaming values. + /// + internal sealed class IAsyncEnumerableConverterFactory : JsonConverterFactory + { + public override bool CanConvert(Type typeToConvert) => GetAsyncEnumerableInterface(typeToConvert) is not null; + + public override JsonConverter CreateConverter(Type typeToConvert, JsonSerializerOptions options) + { + Type? asyncEnumerableInterface = GetAsyncEnumerableInterface(typeToConvert); + Debug.Assert(asyncEnumerableInterface is not null, $"{typeToConvert} not supported by converter."); + + Type elementType = asyncEnumerableInterface.GetGenericArguments()[0]; + Type converterType = typeof(IAsyncEnumerableOfTConverter<,>).MakeGenericType(typeToConvert, elementType); + return (JsonConverter)Activator.CreateInstance(converterType)!; + } + + private static Type? GetAsyncEnumerableInterface(Type type) + => IEnumerableConverterFactoryHelpers.GetCompatibleGenericInterface(type, typeof(IAsyncEnumerable<>)); + } +} diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IAsyncEnumerableOfTConverter.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IAsyncEnumerableOfTConverter.cs new file mode 100644 index 00000000000..3deef03e10f --- /dev/null +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IAsyncEnumerableOfTConverter.cs @@ -0,0 +1,126 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Text.Json.Serialization.Converters +{ + internal sealed class IAsyncEnumerableOfTConverter + : IEnumerableDefaultConverter + where TAsyncEnumerable : IAsyncEnumerable + { + internal override bool OnTryRead(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options, ref ReadStack state, out TAsyncEnumerable value) + { + if (!typeToConvert.IsAssignableFrom(typeof(IAsyncEnumerable))) + { + ThrowHelper.ThrowNotSupportedException_CannotPopulateCollection(TypeToConvert, ref reader, ref state); + } + + return base.OnTryRead(ref reader, typeToConvert, options, ref state, out value!); + } + + protected override void Add(in TElement value, ref ReadStack state) + { + ((BufferedAsyncEnumerable)state.Current.ReturnValue!)._buffer.Add(value); + } + + protected override void CreateCollection(ref Utf8JsonReader reader, ref ReadStack state, JsonSerializerOptions options) + { + state.Current.ReturnValue = new BufferedAsyncEnumerable(); + } + + internal override bool OnTryWrite(Utf8JsonWriter writer, TAsyncEnumerable value, JsonSerializerOptions options, ref WriteStack state) + { + if (!state.SupportContinuation) + { + ThrowHelper.ThrowNotSupportedException_TypeRequiresAsyncSerialization(TypeToConvert); + } + + return base.OnTryWrite(writer, value, options, ref state); + } + + [Diagnostics.CodeAnalysis.SuppressMessage("Reliability", "CA2012:Use ValueTasks correctly", Justification = "Converter needs to consume ValueTask's in a non-async context")] + protected override bool OnWriteResume(Utf8JsonWriter writer, TAsyncEnumerable value, JsonSerializerOptions options, ref WriteStack state) + { + IAsyncEnumerator enumerator; + ValueTask moveNextTask; + + if (state.Current.AsyncEnumerator is null) + { + enumerator = value.GetAsyncEnumerator(state.CancellationToken); + moveNextTask = enumerator.MoveNextAsync(); + // we always need to attach the enumerator to the stack + // since it will need to be disposed asynchronously. + state.Current.AsyncEnumerator = enumerator; + } + else + { + Debug.Assert(state.Current.AsyncEnumerator is IAsyncEnumerator); + enumerator = (IAsyncEnumerator)state.Current.AsyncEnumerator; + + if (state.Current.AsyncEnumeratorIsPendingCompletion) + { + // converter was previously suspended due to a pending MoveNextAsync() task + Debug.Assert(state.PendingTask is Task && state.PendingTask.IsCompleted); + moveNextTask = new ValueTask((Task)state.PendingTask); + state.Current.AsyncEnumeratorIsPendingCompletion = false; + state.PendingTask = null; + } + else + { + // converter was suspended for a different reason; + // the last MoveNextAsync() call can only have completed with 'true'. + moveNextTask = new ValueTask(true); + } + } + + JsonConverter converter = GetElementConverter(ref state); + + // iterate through the enumerator while elements are being returned synchronously + for (; moveNextTask.IsCompleted; moveNextTask = enumerator.MoveNextAsync()) + { + if (!moveNextTask.Result) + { + return true; + } + + if (ShouldFlush(writer, ref state)) + { + return false; + } + + TElement element = enumerator.Current; + if (!converter.TryWrite(writer, element, options, ref state)) + { + return false; + } + } + + // we have a pending MoveNextAsync() call; + // wrap inside a regular task so that it can be awaited multiple times; + // mark the current stackframe as pending completion. + Debug.Assert(state.PendingTask is null); + state.PendingTask = moveNextTask.AsTask(); + state.Current.AsyncEnumeratorIsPendingCompletion = true; + return false; + } + + private sealed class BufferedAsyncEnumerable : IAsyncEnumerable + { + public readonly List _buffer = new(); + +#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously + public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken _) + { + foreach (TElement element in _buffer) + { + yield return element; + } + } +#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously + } + } +} diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IEnumerableDefaultConverter.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IEnumerableDefaultConverter.cs index 67613d90973..0400b617121 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IEnumerableDefaultConverter.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IEnumerableDefaultConverter.cs @@ -218,7 +218,7 @@ protected static JsonConverter GetElementConverter(ref WriteStack stat return true; } - internal sealed override bool OnTryWrite( + internal override bool OnTryWrite( Utf8JsonWriter writer, TCollection value, JsonSerializerOptions options, diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonResumableConverterOfT.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonResumableConverterOfT.cs index 05efb460b64..e1d7eb14c6b 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonResumableConverterOfT.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonResumableConverterOfT.cs @@ -36,7 +36,15 @@ public sealed override void Write(Utf8JsonWriter writer, T value, JsonSerializer WriteStack state = default; state.Initialize(typeof(T), options, supportContinuation: false); - TryWrite(writer, value, options, ref state); + try + { + TryWrite(writer, value, options, ref state); + } + catch + { + state.DisposePendingDisposablesOnException(); + throw; + } } public sealed override bool HandleNull => false; diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Read.Stream.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Read.Stream.cs index 5c695a62c43..5e4b09d6b3f 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Read.Stream.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Read.Stream.cs @@ -2,9 +2,11 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Buffers; +using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.IO; +using System.Runtime.CompilerServices; using System.Text.Json.Serialization; using System.Threading; using System.Threading.Tasks; @@ -45,7 +47,7 @@ public static partial class JsonSerializer throw new ArgumentNullException(nameof(utf8Json)); } - return ReadAsync(utf8Json, typeof(TValue), options, cancellationToken); + return ReadAllAsync(utf8Json, typeof(TValue), options, cancellationToken); } /// @@ -83,137 +85,207 @@ public static partial class JsonSerializer if (returnType == null) throw new ArgumentNullException(nameof(returnType)); - return ReadAsync(utf8Json, returnType, options, cancellationToken); + return ReadAllAsync(utf8Json, returnType, options, cancellationToken); } - private static async ValueTask ReadAsync( + /// + /// Wraps the UTF-8 encoded text into an + /// that can be used to deserialize root-level JSON arrays in a streaming manner. + /// + /// An representation of the provided JSON array. + /// JSON data to parse. + /// Options to control the behavior during reading. + /// The which may be used to cancel the read operation. + /// An representation of the JSON value. + /// + /// is . + /// + public static IAsyncEnumerable DeserializeAsyncEnumerable<[DynamicallyAccessedMembers(MembersAccessedOnRead)] TValue>( Stream utf8Json, - Type returnType, - JsonSerializerOptions? options, - CancellationToken cancellationToken) + JsonSerializerOptions? options = null, + CancellationToken cancellationToken = default) { - if (options == null) + if (utf8Json == null) { - options = JsonSerializerOptions.s_defaultOptions; + throw new ArgumentNullException(nameof(utf8Json)); } - ReadStack state = default; - state.Initialize(returnType, options, supportContinuation: true); + options ??= JsonSerializerOptions.s_defaultOptions; + return CreateAsyncEnumerableDeserializer(utf8Json, options, cancellationToken); - JsonConverter converter = state.Current.JsonPropertyInfo!.ConverterBase; + static async IAsyncEnumerable CreateAsyncEnumerableDeserializer( + Stream utf8Json, + JsonSerializerOptions options, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + var bufferState = new ReadAsyncBufferState(options.DefaultBufferSize); + ReadStack readStack = default; + readStack.Initialize(typeof(Queue), options, supportContinuation: true); + JsonConverter converter = readStack.Current.JsonPropertyInfo!.ConverterBase; + var jsonReaderState = new JsonReaderState(options.GetReaderOptions()); - var readerState = new JsonReaderState(options.GetReaderOptions()); + try + { + do + { + bufferState = await ReadFromStreamAsync(utf8Json, bufferState, cancellationToken).ConfigureAwait(false); + ContinueDeserialize>(ref bufferState, ref jsonReaderState, ref readStack, converter, options); + if (readStack.Current.ReturnValue is Queue queue) + { + while (queue.Count > 0) + { + yield return queue.Dequeue(); + } + } + } + while (!bufferState.IsFinalBlock); + } + finally + { + bufferState.Dispose(); + } + } + } - // todo: https://github.com/dotnet/runtime/issues/32355 - int utf8BomLength = JsonConstants.Utf8Bom.Length; - byte[] buffer = ArrayPool.Shared.Rent(Math.Max(options.DefaultBufferSize, utf8BomLength)); - int bytesInBuffer = 0; - long totalBytesRead = 0; - int clearMax = 0; - bool isFirstIteration = true; + internal static async ValueTask ReadAllAsync( + Stream utf8Json, + Type inputType, + JsonSerializerOptions? options, + CancellationToken cancellationToken) + { + options ??= JsonSerializerOptions.s_defaultOptions; + var asyncState = new ReadAsyncBufferState(options.DefaultBufferSize); + ReadStack readStack = default; + readStack.Initialize(inputType, options, supportContinuation: true); + JsonConverter converter = readStack.Current.JsonPropertyInfo!.ConverterBase; + var jsonReaderState = new JsonReaderState(options.GetReaderOptions()); try { while (true) { - // Read from the stream until either our buffer is filled or we hit EOF. - // Calling ReadCore is relatively expensive, so we minimize the number of times - // we need to call it. - bool isFinalBlock = false; - while (true) + asyncState = await ReadFromStreamAsync(utf8Json, asyncState, cancellationToken).ConfigureAwait(false); + TValue value = ContinueDeserialize(ref asyncState, ref jsonReaderState, ref readStack, converter, options); + + if (asyncState.IsFinalBlock) { - int bytesRead = await utf8Json.ReadAsync( + return value!; + } + } + } + finally + { + asyncState.Dispose(); + } + } + + /// + /// Read from the stream until either our buffer is filled or we hit EOF. + /// Calling ReadCore is relatively expensive, so we minimize the number of times + /// we need to call it. + /// + internal static async ValueTask ReadFromStreamAsync( + Stream utf8Json, + ReadAsyncBufferState bufferState, + CancellationToken cancellationToken) + { + while (true) + { + int bytesRead = await utf8Json.ReadAsync( #if BUILDING_INBOX_LIBRARY - buffer.AsMemory(bytesInBuffer), + bufferState.Buffer.AsMemory(bufferState.BytesInBuffer), #else - buffer, bytesInBuffer, buffer.Length - bytesInBuffer, + bufferState.Buffer, bufferState.BytesInBuffer, bufferState.Buffer.Length - bufferState.BytesInBuffer, #endif - cancellationToken).ConfigureAwait(false); + cancellationToken).ConfigureAwait(false); - if (bytesRead == 0) - { - isFinalBlock = true; - break; - } + if (bytesRead == 0) + { + bufferState.IsFinalBlock = true; + break; + } - totalBytesRead += bytesRead; - bytesInBuffer += bytesRead; + bufferState.BytesInBuffer += bytesRead; - if (bytesInBuffer == buffer.Length) - { - break; - } - } - - if (bytesInBuffer > clearMax) - { - clearMax = bytesInBuffer; - } + if (bufferState.BytesInBuffer == bufferState.Buffer.Length) + { + break; + } + } - int start = 0; - if (isFirstIteration) - { - isFirstIteration = false; + return bufferState; + } - // Handle the UTF-8 BOM if present - Debug.Assert(buffer.Length >= JsonConstants.Utf8Bom.Length); - if (buffer.AsSpan().StartsWith(JsonConstants.Utf8Bom)) - { - start += utf8BomLength; - bytesInBuffer -= utf8BomLength; - } - } + internal static TValue ContinueDeserialize( + ref ReadAsyncBufferState bufferState, + ref JsonReaderState jsonReaderState, + ref ReadStack readStack, + JsonConverter converter, + JsonSerializerOptions options) + { + if (bufferState.BytesInBuffer > bufferState.ClearMax) + { + bufferState.ClearMax = bufferState.BytesInBuffer; + } - // Process the data available - TValue value = ReadCore( - ref readerState, - isFinalBlock, - new ReadOnlySpan(buffer, start, bytesInBuffer), - options, - ref state, - converter); + int start = 0; + if (bufferState.IsFirstIteration) + { + bufferState.IsFirstIteration = false; - Debug.Assert(state.BytesConsumed <= bytesInBuffer); - int bytesConsumed = checked((int)state.BytesConsumed); + // Handle the UTF-8 BOM if present + Debug.Assert(bufferState.Buffer.Length >= JsonConstants.Utf8Bom.Length); + if (bufferState.Buffer.AsSpan().StartsWith(JsonConstants.Utf8Bom)) + { + start += JsonConstants.Utf8Bom.Length; + bufferState.BytesInBuffer -= JsonConstants.Utf8Bom.Length; + } + } - bytesInBuffer -= bytesConsumed; + // Process the data available + TValue value = ReadCore( + ref jsonReaderState, + bufferState.IsFinalBlock, + new ReadOnlySpan(bufferState.Buffer, start, bufferState.BytesInBuffer), + options, + ref readStack, + converter); - if (isFinalBlock) - { - // The reader should have thrown if we have remaining bytes. - Debug.Assert(bytesInBuffer == 0); + Debug.Assert(readStack.BytesConsumed <= bufferState.BytesInBuffer); + int bytesConsumed = checked((int)readStack.BytesConsumed); - return value; - } + bufferState.BytesInBuffer -= bytesConsumed; - // Check if we need to shift or expand the buffer because there wasn't enough data to complete deserialization. - if ((uint)bytesInBuffer > ((uint)buffer.Length / 2)) - { - // We have less than half the buffer available, double the buffer size. - byte[] dest = ArrayPool.Shared.Rent((buffer.Length < (int.MaxValue / 2)) ? buffer.Length * 2 : int.MaxValue); + // The reader should have thrown if we have remaining bytes. + Debug.Assert(!bufferState.IsFinalBlock || bufferState.BytesInBuffer == 0); - // Copy the unprocessed data to the new buffer while shifting the processed bytes. - Buffer.BlockCopy(buffer, bytesConsumed + start, dest, 0, bytesInBuffer); + if (!bufferState.IsFinalBlock) + { + // Check if we need to shift or expand the buffer because there wasn't enough data to complete deserialization. + if ((uint)bufferState.BytesInBuffer > ((uint)bufferState.Buffer.Length / 2)) + { + // We have less than half the buffer available, double the buffer size. + byte[] oldBuffer = bufferState.Buffer; + int oldClearMax = bufferState.ClearMax; + byte[] newBuffer = ArrayPool.Shared.Rent((bufferState.Buffer.Length < (int.MaxValue / 2)) ? bufferState.Buffer.Length * 2 : int.MaxValue); - new Span(buffer, 0, clearMax).Clear(); - ArrayPool.Shared.Return(buffer); + // Copy the unprocessed data to the new buffer while shifting the processed bytes. + Buffer.BlockCopy(oldBuffer, bytesConsumed + start, newBuffer, 0, bufferState.BytesInBuffer); + bufferState.Buffer = newBuffer; + bufferState.ClearMax = bufferState.BytesInBuffer; - clearMax = bytesInBuffer; - buffer = dest; - } - else if (bytesInBuffer != 0) - { - // Shift the processed bytes to the beginning of buffer to make more room. - Buffer.BlockCopy(buffer, bytesConsumed + start, buffer, 0, bytesInBuffer); - } + // Clear and return the old buffer + new Span(oldBuffer, 0, oldClearMax).Clear(); + ArrayPool.Shared.Return(oldBuffer); + } + else if (bufferState.BytesInBuffer != 0) + { + // Shift the processed bytes to the beginning of buffer to make more room. + Buffer.BlockCopy(bufferState.Buffer, bytesConsumed + start, bufferState.Buffer, 0, bufferState.BytesInBuffer); } } - finally - { - // Clear only what we used and return the buffer to the pool - new Span(buffer, 0, clearMax).Clear(); - ArrayPool.Shared.Return(buffer); - } + + return value; } private static TValue ReadCore( @@ -234,7 +306,6 @@ public static partial class JsonSerializer state.BytesConsumed = 0; TValue? value = ReadCore(converterBase, ref reader, options, ref state); - readerState = reader.CurrentState; return value!; } diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Write.Helpers.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Write.Helpers.cs index 6f9dea9343c..75aa2de65f0 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Write.Helpers.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Write.Helpers.cs @@ -29,8 +29,16 @@ public static partial class JsonSerializer WriteStack state = default; JsonConverter jsonConverter = state.Initialize(inputType, options, supportContinuation: false); - bool success = WriteCore(jsonConverter, writer, value, options, ref state); - Debug.Assert(success); + try + { + bool success = WriteCore(jsonConverter, writer, value, options, ref state); + Debug.Assert(success); + } + catch + { + state.DisposePendingDisposablesOnException(); + throw; + } } private static bool WriteCore( diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Write.Stream.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Write.Stream.cs index 6e4a3ef2564..015b136247c 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Write.Stream.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Write.Stream.cs @@ -113,21 +113,52 @@ public static partial class JsonSerializer inputType = value!.GetType(); } - WriteStack state = default; + WriteStack state = new WriteStack { CancellationToken = cancellationToken }; JsonConverter converterBase = state.Initialize(inputType, options, supportContinuation: true); bool isFinalBlock; - do + try { - state.FlushThreshold = (int)(bufferWriter.Capacity * FlushThreshold); - - isFinalBlock = WriteCore(converterBase, writer, value, options, ref state); - - await bufferWriter.WriteToStreamAsync(utf8Json, cancellationToken).ConfigureAwait(false); - - bufferWriter.Clear(); - } while (!isFinalBlock); + do + { + state.FlushThreshold = (int)(bufferWriter.Capacity * FlushThreshold); + + try + { + isFinalBlock = WriteCore(converterBase, writer, value, options, ref state); + } + finally + { + if (state.PendingAsyncDisposables?.Count > 0) + { + await state.DisposePendingAsyncDisposables().ConfigureAwait(false); + } + } + + await bufferWriter.WriteToStreamAsync(utf8Json, cancellationToken).ConfigureAwait(false); + bufferWriter.Clear(); + + if (state.PendingTask is not null) + { + try + { + await state.PendingTask.ConfigureAwait(false); + } + catch + { + // Exceptions will be propagated elsewhere + // TODO https://github.com/dotnet/runtime/issues/22144 + } + } + + } while (!isFinalBlock); + } + catch + { + await state.DisposePendingDisposablesOnExceptionAsync().ConfigureAwait(false); + throw; + } } } } diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializerOptions.Converters.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializerOptions.Converters.cs index 2f17deede6d..226f86f171a 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializerOptions.Converters.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializerOptions.Converters.cs @@ -26,6 +26,8 @@ public sealed partial class JsonSerializerOptions // Nullable converter should always be next since it forwards to any nullable type. new NullableConverterFactory(), new EnumConverterFactory(), + // IAsyncEnumerable takes precedence over IEnumerable. + new IAsyncEnumerableConverterFactory(), // IEnumerable should always be second to last since they can convert any IEnumerable. new IEnumerableConverterFactory(), // Object should always be last since it converts any type. diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ReadAsyncBufferState.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ReadAsyncBufferState.cs new file mode 100644 index 00000000000..2fac132521f --- /dev/null +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ReadAsyncBufferState.cs @@ -0,0 +1,34 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Buffers; +using System.Diagnostics; +using System.Threading; + +namespace System.Text.Json.Serialization +{ + internal struct ReadAsyncBufferState : IDisposable + { + public byte[] Buffer; + public int BytesInBuffer; + public int ClearMax; + public bool IsFirstIteration; + public bool IsFinalBlock; + + public ReadAsyncBufferState(int defaultBufferSize) + { + Buffer = ArrayPool.Shared.Rent(Math.Max(defaultBufferSize, JsonConstants.Utf8Bom.Length)); + BytesInBuffer = ClearMax = 0; + IsFirstIteration = true; + IsFinalBlock = false; + } + + public void Dispose() + { + // Clear only what we used and return the buffer to the pool + new Span(Buffer, 0, ClearMax).Clear(); + ArrayPool.Shared.Return(Buffer); + Buffer = null!; + } + } +} diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/WriteStack.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/WriteStack.cs index 7f231046fc0..99b2ce37e3f 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/WriteStack.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/WriteStack.cs @@ -1,9 +1,13 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Collections; using System.Collections.Generic; using System.Diagnostics; +using System.Runtime.ExceptionServices; using System.Text.Json.Serialization; +using System.Threading; +using System.Threading.Tasks; namespace System.Text.Json { @@ -20,6 +24,22 @@ internal struct WriteStack /// private int _count; + /// + /// Cancellation token used by converters performing async serialization (e.g. IAsyncEnumerable) + /// + public CancellationToken CancellationToken; + + /// + /// Stores a pending task that a resumable converter depends on to continue work. + /// It must be awaited by the root context before serialization is resumed. + /// + public Task? PendingTask; + + /// + /// List of IAsyncDisposables that have been scheduled for disposal by converters. + /// + public List? PendingAsyncDisposables; + private List _previous; // A field is used instead of a property to avoid value semantics. @@ -172,6 +192,14 @@ public void Pop(bool success) else { Debug.Assert(_continuationCount == 0); + + if (Current.AsyncEnumerator is not null) + { + // we have completed serialization of an AsyncEnumerator, + // pop from the stack and schedule for async disposal. + PendingAsyncDisposables ??= new List(); + PendingAsyncDisposables.Add(Current.AsyncEnumerator); + } } if (_count > 1) @@ -180,6 +208,122 @@ public void Pop(bool success) } } + // Asynchronously dispose of any AsyncDisposables that have been scheduled for disposal + public async ValueTask DisposePendingAsyncDisposables() + { + Debug.Assert(PendingAsyncDisposables?.Count > 0); + Exception? exception = null; + + foreach (IAsyncDisposable asyncDisposable in PendingAsyncDisposables) + { + try + { + await asyncDisposable.DisposeAsync().ConfigureAwait(false); + } + catch (Exception e) + { + exception = e; + } + } + + if (exception is not null) + { + ExceptionDispatchInfo.Capture(exception).Throw(); + } + + PendingAsyncDisposables.Clear(); + } + + /// + /// Walks the stack cleaning up any leftover IDisposables + /// in the event of an exception on serialization + /// + public void DisposePendingDisposablesOnException() + { + Exception? exception = null; + + Debug.Assert(Current.AsyncEnumerator is null); + DisposeFrame(Current.CollectionEnumerator, ref exception); + + int stackSize = Math.Max(_count, _continuationCount); + if (stackSize > 1) + { + for (int i = 0; i < stackSize - 1; i++) + { + Debug.Assert(_previous[i].AsyncEnumerator is null); + DisposeFrame(_previous[i].CollectionEnumerator, ref exception); + } + } + + if (exception is not null) + { + ExceptionDispatchInfo.Capture(exception).Throw(); + } + + static void DisposeFrame(IEnumerator? collectionEnumerator, ref Exception? exception) + { + try + { + if (collectionEnumerator is IDisposable disposable) + { + disposable.Dispose(); + } + } + catch (Exception e) + { + exception = e; + } + } + } + + /// + /// Walks the stack cleaning up any leftover I(Async)Disposables + /// in the event of an exception on async serialization + /// + public async ValueTask DisposePendingDisposablesOnExceptionAsync() + { + Exception? exception = null; + + exception = await DisposeFrame(Current.CollectionEnumerator, Current.AsyncEnumerator, exception).ConfigureAwait(false); + + int stackSize = Math.Max(_count, _continuationCount); + if (stackSize > 1) + { + for (int i = 0; i < stackSize - 1; i++) + { + exception = await DisposeFrame(_previous[i].CollectionEnumerator, _previous[i].AsyncEnumerator, exception).ConfigureAwait(false); + } + } + + if (exception is not null) + { + ExceptionDispatchInfo.Capture(exception).Throw(); + } + + static async ValueTask DisposeFrame(IEnumerator? collectionEnumerator, IAsyncDisposable? asyncDisposable, Exception? exception) + { + Debug.Assert(!(collectionEnumerator is not null && asyncDisposable is not null)); + + try + { + if (collectionEnumerator is IDisposable disposable) + { + disposable.Dispose(); + } + else if (asyncDisposable is not null) + { + await asyncDisposable.DisposeAsync().ConfigureAwait(false); + } + } + catch (Exception e) + { + exception = e; + } + + return exception; + } + } + // Return a property path as a simple JSONPath using dot-notation when possible. When special characters are present, bracket-notation is used: // $.x.y.z // $['PropertyName.With.Special.Chars'] diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/WriteStackFrame.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/WriteStackFrame.cs index 59533e5c542..c28e19c9bdd 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/WriteStackFrame.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/WriteStackFrame.cs @@ -3,6 +3,7 @@ using System.Collections; using System.Diagnostics; +using System.Threading.Tasks; using System.Text.Json.Serialization; namespace System.Text.Json @@ -15,6 +16,17 @@ internal struct WriteStackFrame /// public IEnumerator? CollectionEnumerator; + /// + /// The enumerator for resumable async disposables. + /// + public IAsyncDisposable? AsyncEnumerator; + + /// + /// The current stackframe has suspended serialization due to a pending task, + /// stored in the property. + /// + public bool AsyncEnumeratorIsPendingCompletion; + /// /// The original JsonPropertyInfo that is not changed. It contains all properties. /// @@ -113,6 +125,8 @@ public void Reset() { CollectionEnumerator = null; EnumeratorIndex = 0; + AsyncEnumerator = null; + AsyncEnumeratorIsPendingCompletion = false; IgnoreDictionaryKeyPolicy = false; JsonClassInfo = null!; OriginalDepth = 0; diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/ThrowHelper.Serialization.cs b/src/libraries/System.Text.Json/src/System/Text/Json/ThrowHelper.Serialization.cs index 34868bbf4ed..355cbd99ced 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/ThrowHelper.Serialization.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/ThrowHelper.Serialization.cs @@ -26,6 +26,13 @@ public static void ThrowNotSupportedException_SerializationNotSupported(Type pro throw new NotSupportedException(SR.Format(SR.SerializationNotSupportedType, propertyType)); } + [DoesNotReturn] + [MethodImpl(MethodImplOptions.NoInlining)] + public static void ThrowNotSupportedException_TypeRequiresAsyncSerialization(Type propertyType) + { + throw new NotSupportedException(SR.Format(SR.TypeRequiresAsyncSerialization, propertyType)); + } + [DoesNotReturn] [MethodImpl(MethodImplOptions.NoInlining)] public static void ThrowNotSupportedException_ConstructorMaxOf64Parameters(ConstructorInfo constructorInfo, Type type) diff --git a/src/libraries/System.Text.Json/tests/Serialization/CollectionTests/CollectionTests.AsyncEnumerable.cs b/src/libraries/System.Text.Json/tests/Serialization/CollectionTests/CollectionTests.AsyncEnumerable.cs new file mode 100644 index 00000000000..7c2b07d5203 --- /dev/null +++ b/src/libraries/System.Text.Json/tests/Serialization/CollectionTests/CollectionTests.AsyncEnumerable.cs @@ -0,0 +1,320 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace System.Text.Json.Tests.Serialization +{ + public static partial class CollectionTests + { + [Theory] + [MemberData(nameof(GetAsyncEnumerableSources))] + public static async Task WriteRootLevelAsyncEnumerable(IEnumerable source, int delayInterval, int bufferSize) + { + JsonSerializerOptions options = new JsonSerializerOptions + { + DefaultBufferSize = bufferSize + }; + + string expectedJson = JsonSerializer.Serialize(source); + + using var stream = new Utf8MemoryStream(); + var asyncEnumerable = new MockedAsyncEnumerable(source, delayInterval); + await JsonSerializer.SerializeAsync(stream, asyncEnumerable, options); + + JsonTestHelper.AssertJsonEqual(expectedJson, stream.ToString()); + Assert.Equal(1, asyncEnumerable.TotalCreatedEnumerators); + Assert.Equal(1, asyncEnumerable.TotalDisposedEnumerators); + } + + [Theory] + [MemberData(nameof(GetAsyncEnumerableSources))] + public static async Task WriteNestedAsyncEnumerable(IEnumerable source, int delayInterval, int bufferSize) + { + JsonSerializerOptions options = new JsonSerializerOptions + { + DefaultBufferSize = bufferSize + }; + + string expectedJson = JsonSerializer.Serialize(new { Data = source }); + + using var stream = new Utf8MemoryStream(); + var asyncEnumerable = new MockedAsyncEnumerable(source, delayInterval); + await JsonSerializer.SerializeAsync(stream, new { Data = asyncEnumerable }, options); + + JsonTestHelper.AssertJsonEqual(expectedJson, stream.ToString()); + Assert.Equal(1, asyncEnumerable.TotalCreatedEnumerators); + Assert.Equal(1, asyncEnumerable.TotalDisposedEnumerators); + } + + [Theory] + [MemberData(nameof(GetAsyncEnumerableSources))] + public static async Task WriteNestedAsyncEnumerable_DTO(IEnumerable source, int delayInterval, int bufferSize) + { + JsonSerializerOptions options = new JsonSerializerOptions + { + DefaultBufferSize = bufferSize + }; + + string expectedJson = JsonSerializer.Serialize(new { Data = source }); + + using var stream = new Utf8MemoryStream(); + var asyncEnumerable = new MockedAsyncEnumerable(source, delayInterval); + await JsonSerializer.SerializeAsync(stream, new AsyncEnumerableDto { Data = asyncEnumerable }, options); + + JsonTestHelper.AssertJsonEqual(expectedJson, stream.ToString()); + Assert.Equal(1, asyncEnumerable.TotalCreatedEnumerators); + Assert.Equal(1, asyncEnumerable.TotalDisposedEnumerators); + } + + [Fact, OuterLoop] + public static async Task WriteAsyncEnumerable_LongRunningEnumeration_Cancellation() + { + var longRunningEnumerable = new MockedAsyncEnumerable( + source: Enumerable.Range(1, 100), + delayInterval: 1, + delay: TimeSpan.FromMinutes(1)); + + using var utf8Stream = new Utf8MemoryStream(); + using var cts = new CancellationTokenSource(delay: TimeSpan.FromSeconds(5)); + await Assert.ThrowsAsync(async () => + await JsonSerializer.SerializeAsync(utf8Stream, longRunningEnumerable, cancellationToken: cts.Token)); + + Assert.Equal(1, longRunningEnumerable.TotalCreatedEnumerators); + Assert.Equal(1, longRunningEnumerable.TotalDisposedEnumerators); + } + + public class AsyncEnumerableDto + { + public IAsyncEnumerable Data { get; set; } + } + + [Theory] + [MemberData(nameof(GetAsyncEnumerableSources))] + public static async Task WriteSequentialNestedAsyncEnumerables(IEnumerable source, int delayInterval, int bufferSize) + { + JsonSerializerOptions options = new JsonSerializerOptions + { + DefaultBufferSize = bufferSize + }; + + string expectedJson = JsonSerializer.Serialize(new { Data1 = source, Data2 = source }); + + using var stream = new Utf8MemoryStream(); + var asyncEnumerable = new MockedAsyncEnumerable(source, delayInterval); + await JsonSerializer.SerializeAsync(stream, new { Data1 = asyncEnumerable, Data2 = asyncEnumerable }, options); + + JsonTestHelper.AssertJsonEqual(expectedJson, stream.ToString()); + Assert.Equal(2, asyncEnumerable.TotalCreatedEnumerators); + Assert.Equal(2, asyncEnumerable.TotalDisposedEnumerators); + } + + [Theory] + [MemberData(nameof(GetAsyncEnumerableSources))] + public static async Task WriteAsyncEnumerableOfAsyncEnumerables(IEnumerable source, int delayInterval, int bufferSize) + { + JsonSerializerOptions options = new JsonSerializerOptions + { + DefaultBufferSize = bufferSize + }; + + const int OuterEnumerableCount = 5; + string expectedJson = JsonSerializer.Serialize(Enumerable.Repeat(source, OuterEnumerableCount)); + + var innerAsyncEnumerable = new MockedAsyncEnumerable(source, delayInterval); + var outerAsyncEnumerable = + new MockedAsyncEnumerable>( + Enumerable.Repeat(innerAsyncEnumerable, OuterEnumerableCount), delayInterval); + + using var stream = new Utf8MemoryStream(); + await JsonSerializer.SerializeAsync(stream, outerAsyncEnumerable, options); + + JsonTestHelper.AssertJsonEqual(expectedJson, stream.ToString()); + Assert.Equal(1, outerAsyncEnumerable.TotalCreatedEnumerators); + Assert.Equal(1, outerAsyncEnumerable.TotalDisposedEnumerators); + Assert.Equal(OuterEnumerableCount, innerAsyncEnumerable.TotalCreatedEnumerators); + Assert.Equal(OuterEnumerableCount, innerAsyncEnumerable.TotalDisposedEnumerators); + } + + [Fact] + public static void WriteRootLevelAsyncEnumerableSync_ThrowsNotSupportedException() + { + IAsyncEnumerable asyncEnumerable = new MockedAsyncEnumerable(Enumerable.Range(1, 10)); + Assert.Throws(() => JsonSerializer.Serialize(asyncEnumerable)); + } + + [Fact] + public static void WriteNestedAsyncEnumerableSync_ThrowsNotSupportedException() + { + IAsyncEnumerable asyncEnumerable = new MockedAsyncEnumerable(Enumerable.Range(1, 10)); + Assert.Throws(() => JsonSerializer.Serialize(new { Data = asyncEnumerable })); + } + + [Fact] + public static async Task WriteAsyncEnumerable_ElementSerializationThrows_ShouldDisposeEnumerator() + { + using var stream = new Utf8MemoryStream(); + var asyncEnumerable = new MockedAsyncEnumerable>(Enumerable.Repeat(ThrowingEnumerable(), 2)); + + await Assert.ThrowsAsync(() => JsonSerializer.SerializeAsync(stream, new { Data = asyncEnumerable })); + Assert.Equal(1, asyncEnumerable.TotalCreatedEnumerators); + Assert.Equal(1, asyncEnumerable.TotalDisposedEnumerators); + + static IEnumerable ThrowingEnumerable() + { + yield return 0; + throw new DivideByZeroException(); + } + } + + [Fact] + public static async Task ReadRootLevelAsyncEnumerable() + { + var utf8Stream = new Utf8MemoryStream("[0,1,2,3,4]"); + + IAsyncEnumerable result = await JsonSerializer.DeserializeAsync>(utf8Stream); + Assert.Equal(new int[] { 0, 1, 2, 3, 4 }, await result.ToListAsync()); + } + + [Fact] + public static async Task ReadNestedAsyncEnumerable() + { + var utf8Stream = new Utf8MemoryStream(@"{ ""Data"" : [0,1,2,3,4] }"); + + var result = await JsonSerializer.DeserializeAsync>(utf8Stream); + Assert.Equal(new int[] { 0, 1, 2, 3, 4 }, await result.Data.ToListAsync()); + } + + [Fact] + public static async Task ReadAsyncEnumerableOfAsyncEnumerables() + { + var utf8Stream = new Utf8MemoryStream("[[0,1,2,3,4], []]"); + + var result = await JsonSerializer.DeserializeAsync>>(utf8Stream); + var resultArray = await result.ToListAsync(); + + Assert.Equal(2, resultArray.Count); + Assert.Equal(new int[] { 0, 1, 2, 3, 4 }, await resultArray[0].ToListAsync()); + Assert.Equal(Array.Empty(), await resultArray[1].ToListAsync()); + } + + [Fact] + public static async Task ReadRootLevelAsyncEnumerableDerivative_ThrowsNotSupportedException() + { + var utf8Stream = new Utf8MemoryStream("[0,1,2,3,4]"); + await Assert.ThrowsAsync(async () => await JsonSerializer.DeserializeAsync>(utf8Stream)); + } + + public static IEnumerable GetAsyncEnumerableSources() + { + yield return WrapArgs(Enumerable.Empty(), 0, 1); + yield return WrapArgs(Enumerable.Range(0, 20), 0, 1); + yield return WrapArgs(Enumerable.Range(0, 100), 20, 20); + yield return WrapArgs(Enumerable.Range(0, 1000), 20, 20); + yield return WrapArgs(Enumerable.Range(0, 100).Select(i => $"lorem ipsum dolor: {i}"), 20, 100); + yield return WrapArgs(Enumerable.Range(0, 10).Select(i => new { Field1 = i, Field2 = $"lorem ipsum dolor: {i}", Field3 = i % 2 == 0 }), 3, 100); + yield return WrapArgs(Enumerable.Range(0, 100).Select(i => new { Field1 = i, Field2 = $"lorem ipsum dolor: {i}", Field3 = i % 2 == 0 }), 20, 100); + + static object[] WrapArgs(IEnumerable source, int delayInterval, int bufferSize) => new object[]{ source, delayInterval, bufferSize }; + } + + private static async Task> ToListAsync(this IAsyncEnumerable source) + { + var list = new List(); + await foreach (T item in source) + { + list.Add(item); + } + return list; + } + + private class MockedAsyncEnumerable : IAsyncEnumerable, IEnumerable + { + private readonly IEnumerable _source; + private readonly TimeSpan _delay; + private readonly int _delayInterval; + + internal int TotalCreatedEnumerators { get; private set; } + internal int TotalDisposedEnumerators { get; private set; } + internal int TotalEnumeratedElements { get; private set; } + + public MockedAsyncEnumerable(IEnumerable source, int delayInterval = 0, TimeSpan? delay = null) + { + _source = source; + _delay = delay ?? TimeSpan.FromMilliseconds(20); + _delayInterval = delayInterval; + } + + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + return new MockedAsyncEnumerator(this, cancellationToken); + } + + // Enumerator class required to instrument IAsyncDisposable calls + private class MockedAsyncEnumerator : IAsyncEnumerator + { + private readonly MockedAsyncEnumerable _enumerable; + private IAsyncEnumerator _innerEnumerator; + + public MockedAsyncEnumerator(MockedAsyncEnumerable enumerable, CancellationToken token) + { + _enumerable = enumerable; + _innerEnumerator = enumerable.GetAsyncEnumeratorInner(token); + } + + public TElement Current => _innerEnumerator.Current; + public ValueTask DisposeAsync() + { + _enumerable.TotalDisposedEnumerators++; + return _innerEnumerator.DisposeAsync(); + } + + public ValueTask MoveNextAsync() => _innerEnumerator.MoveNextAsync(); + } + + private async IAsyncEnumerator GetAsyncEnumeratorInner(CancellationToken cancellationToken = default) + { + TotalCreatedEnumerators++; + int i = 0; + foreach (TElement element in _source) + { + if (i > 0 && _delayInterval > 0 && i % _delayInterval == 0) + { + await Task.Delay(_delay, cancellationToken); + } + + if (cancellationToken.IsCancellationRequested) + { + yield break; + } + + TotalEnumeratedElements++; + yield return element; + i++; + } + } + + public IEnumerator GetEnumerator() => throw new InvalidOperationException("Collection should not be enumerated synchronously."); + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + } + + private class Utf8MemoryStream : MemoryStream + { + public Utf8MemoryStream() : base() + { + } + + public Utf8MemoryStream(string text) : base(Encoding.UTF8.GetBytes(text)) + { + } + + public override string ToString() => Encoding.UTF8.GetString(ToArray()); + } + } +} diff --git a/src/libraries/System.Text.Json/tests/Serialization/Stream.DeserializeAsyncEnumerable.cs b/src/libraries/System.Text.Json/tests/Serialization/Stream.DeserializeAsyncEnumerable.cs new file mode 100644 index 00000000000..27db96a7877 --- /dev/null +++ b/src/libraries/System.Text.Json/tests/Serialization/Stream.DeserializeAsyncEnumerable.cs @@ -0,0 +1,148 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace System.Text.Json.Serialization.Tests +{ + public static partial class StreamTests_DeserializeAsyncEnumerable + { + [Theory] + [InlineData(0, 1)] + [InlineData(1, 1)] + [InlineData(10, 1)] + [InlineData(100, 1)] + [InlineData(1000, 1)] + [InlineData(1000, 1000)] + [InlineData(1000, 32000)] + public static async Task DeserializeAsyncEnumerable_ReadSimpleObjectAsync(int count, int bufferSize) + { + JsonSerializerOptions options = new JsonSerializerOptions + { + DefaultBufferSize = bufferSize + }; + + using var stream = new MemoryStream(GenerateJsonArray(count)); + + int callbackCount = 0; + await foreach(SimpleTestClass item in JsonSerializer.DeserializeAsyncEnumerable(stream, options)) + { + Assert.Equal(callbackCount, item.MyInt32); + + item.MyInt32 = 2; // Put correct value back for Verify() + item.Verify(); + + callbackCount++; + } + + Assert.Equal(count, callbackCount); + + static byte[] GenerateJsonArray(int count) + { + SimpleTestClass[] collection = new SimpleTestClass[count]; + for (int i = 0; i < collection.Length; i++) + { + var obj = new SimpleTestClass(); + obj.Initialize(); + obj.MyInt32 = i; // verify order correctness + collection[i] = obj; + } + + return JsonSerializer.SerializeToUtf8Bytes(collection); + } + } + + [Theory] + [MemberData(nameof(GetAsyncEnumerableSources))] + public static async Task DeserializeAsyncEnumerable_ReadSourceAsync(IEnumerable source, int bufferSize) + { + JsonSerializerOptions options = new JsonSerializerOptions + { + DefaultBufferSize = bufferSize + }; + + byte[] data = JsonSerializer.SerializeToUtf8Bytes(source); + + using var stream = new MemoryStream(data); + List results = await JsonSerializer.DeserializeAsyncEnumerable(stream, options).ToListAsync(); + Assert.Equal(source, results); + } + + [Fact] + public static void DeserializeAsyncEnumerable_NullStream_ThrowsArgumentNullException() + { + AssertExtensions.Throws("utf8Json", () => JsonSerializer.DeserializeAsyncEnumerable(utf8Json: null)); + } + + [Fact] + public static async Task DeserializeAsyncEnumerable_CancellationToken_ThrowsOnCancellation() + { + JsonSerializerOptions options = new JsonSerializerOptions + { + DefaultBufferSize = 1 + }; + + byte[] data = JsonSerializer.SerializeToUtf8Bytes(Enumerable.Range(1, 100)); + + var token = new CancellationToken(canceled: true); + using var stream = new MemoryStream(data); + var cancellableAsyncEnumerable = JsonSerializer.DeserializeAsyncEnumerable(stream, options, token); + + await Assert.ThrowsAsync(async () => + { + await foreach (int element in cancellableAsyncEnumerable) + { + } + }); + } + + [Fact] + public static async Task DeserializeAsyncEnumerable_EnumeratorWithCancellationToken_ThrowsOnCancellation() + { + JsonSerializerOptions options = new JsonSerializerOptions + { + DefaultBufferSize = 1 + }; + + byte[] data = JsonSerializer.SerializeToUtf8Bytes(Enumerable.Range(1, 100)); + + var token = new CancellationToken(canceled: true); + using var stream = new MemoryStream(data); + var cancellableAsyncEnumerable = JsonSerializer.DeserializeAsyncEnumerable(stream, options).WithCancellation(token); + + await Assert.ThrowsAsync(async () => + { + await foreach (int element in cancellableAsyncEnumerable) + { + } + }); + } + + public static IEnumerable GetAsyncEnumerableSources() + { + yield return WrapArgs(Enumerable.Empty(), 1); + yield return WrapArgs(Enumerable.Range(0, 20), 1); + yield return WrapArgs(Enumerable.Range(0, 100), 20); + yield return WrapArgs(Enumerable.Range(0, 100).Select(i => $"lorem ipsum dolor: {i}"), 500); + yield return WrapArgs(Enumerable.Range(0, 10).Select(i => new { Field1 = i, Field2 = $"lorem ipsum dolor: {i}", Field3 = i % 2 == 0 }), 100); + yield return WrapArgs(Enumerable.Range(0, 100).Select(i => new { Field1 = i, Field2 = $"lorem ipsum dolor: {i}", Field3 = i % 2 == 0 }), 500); + + static object[] WrapArgs(IEnumerable source, int bufferSize) => new object[] { source, bufferSize }; + } + + private static async Task> ToListAsync(this IAsyncEnumerable source) + { + var list = new List(); + await foreach (T item in source) + { + list.Add(item); + } + return list; + } + } +} diff --git a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests.csproj b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests.csproj index dbebf9b1814..26116948c43 100644 --- a/src/libraries/System.Text.Json/tests/System.Text.Json.Tests.csproj +++ b/src/libraries/System.Text.Json/tests/System.Text.Json.Tests.csproj @@ -40,6 +40,7 @@ + @@ -123,6 +124,7 @@ + -- GitLab