未验证 提交 9e16f098 编写于 作者: C campersau 提交者: GitHub

[browser] [wasm] Request Streaming upload (#91295)

Co-authored-by: Npavelsavara <pavel.savara@gmail.com>
上级 d8b177e2
......@@ -28,7 +28,6 @@ public abstract class HttpClientHandler_Cancellation_Test : HttpClientHandlerTes
[Theory]
[InlineData(false, CancellationMode.Token)]
[InlineData(true, CancellationMode.Token)]
[ActiveIssue("https://github.com/dotnet/runtime/issues/36634", TestPlatforms.Browser)] // out of memory
public async Task PostAsync_CancelDuringRequestContentSend_TaskCanceledQuickly(bool chunkedTransfer, CancellationMode mode)
{
if (LoopbackServerFactory.Version >= HttpVersion20.Value && chunkedTransfer)
......@@ -42,6 +41,12 @@ public async Task PostAsync_CancelDuringRequestContentSend_TaskCanceledQuickly(b
return;
}
if (PlatformDetection.IsBrowser && LoopbackServerFactory.Version < HttpVersion20.Value)
{
// Browser request streaming is only supported on HTTP/2 or higher
return;
}
var serverRelease = new TaskCompletionSource<bool>();
await LoopbackServerFactory.CreateClientAndServerAsync(async uri =>
{
......@@ -58,6 +63,13 @@ public async Task PostAsync_CancelDuringRequestContentSend_TaskCanceledQuickly(b
req.Content = new ByteAtATimeContent(int.MaxValue, waitToSend.Task, contentSending, millisecondDelayBetweenBytes: 1);
req.Headers.TransferEncodingChunked = chunkedTransfer;
if (PlatformDetection.IsBrowser)
{
#if !NETFRAMEWORK
req.Options.Set(new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingRequest"), true);
#endif
}
Task<HttpResponseMessage> resp = client.SendAsync(TestAsync, req, HttpCompletionOption.ResponseHeadersRead, cts.Token);
waitToSend.SetResult(true);
await Task.WhenAny(contentSending.Task, resp);
......
......@@ -1886,9 +1886,11 @@ public async Task SendAsync_101SwitchingProtocolsResponse_Success()
}
[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task PostAsync_ThrowFromContentCopy_RequestFails(bool syncFailure)
[InlineData(false, false)]
[InlineData(false, true)]
[InlineData(true, false)]
[InlineData(true, true)]
public async Task PostAsync_ThrowFromContentCopy_RequestFails(bool syncFailure, bool enableWasmStreaming)
{
if (UseVersion == HttpVersion30)
{
......@@ -1896,6 +1898,18 @@ public async Task PostAsync_ThrowFromContentCopy_RequestFails(bool syncFailure)
return;
}
if (enableWasmStreaming && !PlatformDetection.IsBrowser)
{
// enableWasmStreaming makes only sense on Browser platform
return;
}
if (enableWasmStreaming && PlatformDetection.IsBrowser && UseVersion < HttpVersion20.Value)
{
// Browser request streaming is only supported on HTTP/2 or higher
return;
}
await LoopbackServer.CreateServerAsync(async (server, uri) =>
{
Task responseTask = server.AcceptConnectionAsync(async connection =>
......@@ -1914,8 +1928,20 @@ public async Task PostAsync_ThrowFromContentCopy_RequestFails(bool syncFailure)
canReadFunc: () => true,
readFunc: (buffer, offset, count) => throw error,
readAsyncFunc: (buffer, offset, count, cancellationToken) => syncFailure ? throw error : Task.Delay(1).ContinueWith<int>(_ => throw error)));
var request = new HttpRequestMessage(HttpMethod.Post, uri);
request.Content = content;
if (PlatformDetection.IsBrowser)
{
if (enableWasmStreaming)
{
#if !NETFRAMEWORK
request.Options.Set(new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingRequest"), true);
#endif
}
}
Assert.Same(error, await Assert.ThrowsAsync<FormatException>(() => client.PostAsync(uri, content)));
Assert.Same(error, await Assert.ThrowsAsync<FormatException>(() => client.SendAsync(request)));
}
});
}
......
......@@ -229,9 +229,147 @@ await client.GetAsync(remoteServer.EchoUri, HttpCompletionOption.ResponseHeaders
}
#if NETCOREAPP
[OuterLoop]
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsBrowser))]
public async Task BrowserHttpHandler_Streaming()
{
var WebAssemblyEnableStreamingRequestKey = new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingRequest");
var WebAssemblyEnableStreamingResponseKey = new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingResponse");
var req = new HttpRequestMessage(HttpMethod.Post, Configuration.Http.RemoteHttp2Server.BaseUri + "echobody.ashx");
req.Options.Set(WebAssemblyEnableStreamingRequestKey, true);
req.Options.Set(WebAssemblyEnableStreamingResponseKey, true);
byte[] body = new byte[1024 * 1024];
Random.Shared.NextBytes(body);
int readOffset = 0;
req.Content = new StreamContent(new DelegateStream(
readAsyncFunc: async (buffer, offset, count, cancellationToken) =>
{
await Task.Delay(1);
if (readOffset < body.Length)
{
int send = Math.Min(body.Length - readOffset, count);
body.AsSpan(readOffset, send).CopyTo(buffer.AsSpan(offset, send));
readOffset += send;
return send;
}
return 0;
}));
using (HttpClient client = CreateHttpClientForRemoteServer(Configuration.Http.RemoteHttp2Server))
// we need to switch off Response buffering of default ResponseContentRead option
using (HttpResponseMessage response = await client.SendAsync(req, HttpCompletionOption.ResponseHeadersRead))
{
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
// Streaming requests can't set Content-Length
Assert.False(response.Headers.Contains("X-HttpRequest-Headers-ContentLength"));
// Streaming response uses StreamContent
Assert.Equal(typeof(StreamContent), response.Content.GetType());
var stream = await response.Content.ReadAsStreamAsync();
Assert.Equal("ReadOnlyStream", stream.GetType().Name);
var buffer = new byte[1024 * 1024];
int totalCount = 0;
int fetchedCount = 0;
do
{
fetchedCount = await stream.ReadAsync(buffer, 0, buffer.Length);
Assert.True(body.AsSpan(totalCount, fetchedCount).SequenceEqual(buffer.AsSpan(0, fetchedCount)));
totalCount += fetchedCount;
} while (fetchedCount != 0);
Assert.Equal(body.Length, totalCount);
}
}
[OuterLoop]
[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsBrowser))]
[InlineData(true)]
[InlineData(false)]
public async Task BrowserHttpHandler_StreamingRequest(bool useStringContent)
{
var WebAssemblyEnableStreamingRequestKey = new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingRequest");
var req = new HttpRequestMessage(HttpMethod.Post, Configuration.Http.Http2RemoteVerifyUploadServer);
req.Options.Set(WebAssemblyEnableStreamingRequestKey, true);
int size;
if (useStringContent)
{
string bodyContent = "Hello World";
size = bodyContent.Length;
req.Content = new StringContent(bodyContent);
}
else
{
size = 1500 * 1024 * 1024;
int remaining = size;
req.Content = new StreamContent(new DelegateStream(
readAsyncFunc: (buffer, offset, count, cancellationToken) =>
{
if (remaining > 0)
{
int send = Math.Min(remaining, count);
buffer.AsSpan(offset, send).Fill(65);
remaining -= send;
return Task.FromResult(send);
}
return Task.FromResult(0);
}));
}
req.Content.Headers.Add("Content-MD5-Skip", "browser");
using (HttpClient client = CreateHttpClientForRemoteServer(Configuration.Http.RemoteHttp2Server))
using (HttpResponseMessage response = await client.SendAsync(req))
{
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
Assert.Equal(size.ToString(), Assert.Single(response.Headers.GetValues("X-HttpRequest-Body-Length")));
// Streaming requests can't set Content-Length
Assert.Equal(useStringContent, response.Headers.Contains("X-HttpRequest-Headers-ContentLength"));
if (useStringContent)
{
Assert.Equal(size.ToString(), Assert.Single(response.Headers.GetValues("X-HttpRequest-Headers-ContentLength")));
}
}
}
// Duplicate of PostAsync_ThrowFromContentCopy_RequestFails using remote server
[OuterLoop]
[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsBrowser))]
[InlineData(false)]
[InlineData(true)]
public async Task BrowserHttpHandler_StreamingRequest_ThrowFromContentCopy_RequestFails(bool syncFailure)
{
var WebAssemblyEnableStreamingRequestKey = new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingRequest");
var req = new HttpRequestMessage(HttpMethod.Post, Configuration.Http.Http2RemoteEchoServer);
req.Options.Set(WebAssemblyEnableStreamingRequestKey, true);
Exception error = new FormatException();
var content = new StreamContent(new DelegateStream(
canSeekFunc: () => true,
lengthFunc: () => 12345678,
positionGetFunc: () => 0,
canReadFunc: () => true,
readFunc: (buffer, offset, count) => throw error,
readAsyncFunc: (buffer, offset, count, cancellationToken) => syncFailure ? throw error : Task.Delay(1).ContinueWith<int>(_ => throw error)));
req.Content = content;
using (HttpClient client = CreateHttpClientForRemoteServer(Configuration.Http.RemoteHttp2Server))
{
Assert.Same(error, await Assert.ThrowsAsync<FormatException>(() => client.SendAsync(req)));
}
}
[OuterLoop]
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsBrowser))]
public async Task BrowserHttpHandler_StreamingResponse()
{
var WebAssemblyEnableStreamingResponseKey = new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingResponse");
......@@ -244,6 +382,7 @@ public async Task BrowserHttpHandler_Streaming()
// we need to switch off Response buffering of default ResponseContentRead option
using (HttpResponseMessage response = await client.SendAsync(req, HttpCompletionOption.ResponseHeadersRead))
{
// Streaming response uses StreamContent
Assert.Equal(typeof(StreamContent), response.Content.GetType());
Assert.Equal("application/octet-stream", response.Content.Headers.ContentType.MediaType);
......
......@@ -88,6 +88,11 @@ public async Task Invoke(HttpContext context)
await LargeResponseHandler.InvokeAsync(context);
return;
}
if (path.Equals(new PathString("/echobody.ashx")))
{
await EchoBodyHandler.InvokeAsync(context);
return;
}
// Default handling.
await EchoHandler.InvokeAsync(context);
......
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
namespace NetCoreServer
{
public class EchoBodyHandler
{
public static async Task InvokeAsync(HttpContext context)
{
context.Features.Get<IHttpMaxRequestBodySizeFeature>().MaxRequestBodySize = null;
// Report back original request method verb.
context.Response.Headers["X-HttpRequest-Method"] = context.Request.Method;
// Report back original entity-body related request headers.
string contentLength = context.Request.Headers["Content-Length"];
if (!string.IsNullOrEmpty(contentLength))
{
context.Response.Headers["X-HttpRequest-Headers-ContentLength"] = contentLength;
}
string transferEncoding = context.Request.Headers["Transfer-Encoding"];
if (!string.IsNullOrEmpty(transferEncoding))
{
context.Response.Headers["X-HttpRequest-Headers-TransferEncoding"] = transferEncoding;
}
context.Response.StatusCode = 200;
context.Response.ContentType = context.Request.ContentType;
await context.Request.Body.CopyToAsync(context.Response.Body);
}
}
}
......@@ -6,6 +6,7 @@
using System.Security.Cryptography;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
namespace NetCoreServer
{
......@@ -13,6 +14,8 @@ public class VerifyUploadHandler
{
public static async Task InvokeAsync(HttpContext context)
{
context.Features.Get<IHttpMaxRequestBodySizeFeature>().MaxRequestBodySize = null;
// Report back original request method verb.
context.Response.Headers["X-HttpRequest-Method"] = context.Request.Method;
......@@ -29,12 +32,15 @@ public static async Task InvokeAsync(HttpContext context)
context.Response.Headers["X-HttpRequest-Headers-TransferEncoding"] = transferEncoding;
}
// Get request body.
byte[] requestBodyBytes = await ReadAllRequestBytesAsync(context);
// Compute MD5 hash of received request body.
(byte[] md5Bytes, int bodyLength) = await ComputeMD5HashRequestBodyAsync(context);
// Report back the actual body length.
context.Response.Headers["X-HttpRequest-Body-Length"] = bodyLength.ToString();
// Skip MD5 checksum for empty request body
// Skip MD5 checksum for empty request body
// or for requests which opt to skip it due to [ActiveIssue("https://github.com/dotnet/runtime/issues/37669", TestPlatforms.Browser)]
if (requestBodyBytes.Length == 0 || !string.IsNullOrEmpty(context.Request.Headers["Content-MD5-Skip"]))
if (bodyLength == 0 || !string.IsNullOrEmpty(context.Request.Headers["Content-MD5-Skip"]))
{
context.Response.StatusCode = 200;
return;
......@@ -49,13 +55,7 @@ public static async Task InvokeAsync(HttpContext context)
return;
}
// Compute MD5 hash of received request body.
string actualHash;
using (MD5 md5 = MD5.Create())
{
byte[] hash = md5.ComputeHash(requestBodyBytes);
actualHash = Convert.ToBase64String(hash);
}
string actualHash = Convert.ToBase64String(md5Bytes);
if (expectedHash == actualHash)
{
......@@ -66,21 +66,22 @@ public static async Task InvokeAsync(HttpContext context)
context.Response.StatusCode = 400;
context.Response.SetStatusDescription("Received request body fails MD5 checksum");
}
}
private static async Task<byte[]> ReadAllRequestBytesAsync(HttpContext context)
private static async Task<(byte[] MD5Hash, int BodyLength)> ComputeMD5HashRequestBodyAsync(HttpContext context)
{
Stream requestStream = context.Request.Body;
byte[] buffer = new byte[16 * 1024];
using (MemoryStream ms = new MemoryStream())
using (MD5 md5 = MD5.Create())
{
int read;
int read, size = 0;
while ((read = await requestStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
ms.Write(buffer, 0, read);
size += read;
md5.TransformBlock(buffer, 0, read, buffer, 0);
}
return ms.ToArray();
md5.TransformFinalBlock(buffer, 0, read);
return (md5.Hash, size);
}
}
}
......
......@@ -26,6 +26,7 @@
<ItemGroup>
<Compile Include="Handlers\DeflateHandler.cs" />
<Compile Include="Handlers\EchoHandler.cs" />
<Compile Include="Handlers\EchoBodyHandler.cs" />
<Compile Include="Handlers\EchoWebSocketHandler.cs" />
<Compile Include="Handlers\EchoWebSocketHeadersHandler.cs" />
<Compile Include="Handlers\EmptyContentHandler.cs" />
......
......@@ -15,6 +15,7 @@ namespace System.Net.Http
// the JavaScript objects have thread affinity, it is necessary that the continuations run the same thread as the start of the async method.
internal sealed class BrowserHttpHandler : HttpMessageHandler
{
private static readonly HttpRequestOptionsKey<bool> EnableStreamingRequest = new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingRequest");
private static readonly HttpRequestOptionsKey<bool> EnableStreamingResponse = new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingResponse");
private static readonly HttpRequestOptionsKey<IDictionary<string, object>> FetchOptions = new HttpRequestOptionsKey<IDictionary<string, object>>("WebAssemblyFetchOptions");
private bool _allowAutoRedirect = HttpHandlerDefaults.DefaultAutomaticRedirection;
......@@ -220,10 +221,28 @@ private static async Task<WasmFetchResponse> CallFetch(HttpRequestMessage reques
}
else
{
byte[] buffer = await request.Content.ReadAsByteArrayAsync(cancellationToken).ConfigureAwait(true);
cancellationToken.ThrowIfCancellationRequested();
bool streamingEnabled = false;
if (BrowserHttpInterop.SupportsStreamingRequest())
{
request.Options.TryGetValue(EnableStreamingRequest, out streamingEnabled);
}
if (streamingEnabled)
{
Stream stream = await request.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(true);
cancellationToken.ThrowIfCancellationRequested();
ReadableStreamPullState pullState = new ReadableStreamPullState(stream, cancellationToken);
promise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController, ReadableStreamPull, pullState);
}
else
{
byte[] buffer = await request.Content.ReadAsByteArrayAsync(cancellationToken).ConfigureAwait(true);
cancellationToken.ThrowIfCancellationRequested();
promise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController, buffer);
promise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController, buffer);
}
}
}
else
......@@ -248,6 +267,14 @@ private static async Task<WasmFetchResponse> CallFetch(HttpRequestMessage reques
}
}
private static void ReadableStreamPull(object state)
{
ReadableStreamPullState pullState = (ReadableStreamPullState)state;
#pragma warning disable CS4014 // intentionally not awaited
pullState.PullAsync();
#pragma warning restore CS4014
}
private static HttpResponseMessage ConvertResponse(HttpRequestMessage request, WasmFetchResponse fetchResponse)
{
#if FEATURE_WASM_THREADS
......@@ -312,6 +339,43 @@ static async Task<HttpResponseMessage> Impl(HttpRequestMessage request, Cancella
}
}
internal sealed class ReadableStreamPullState
{
private readonly Stream _stream;
private readonly CancellationToken _cancellationToken;
private readonly byte[] _buffer;
public ReadableStreamPullState(Stream stream, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(stream);
_stream = stream;
_cancellationToken = cancellationToken;
_buffer = new byte[65536];
}
public async Task PullAsync()
{
try
{
int length = await _stream.ReadAsync(_buffer, _cancellationToken).ConfigureAwait(true);
ReadableStreamControllerEnqueueUnsafe(this, _buffer, length);
}
catch (Exception ex)
{
BrowserHttpInterop.ReadableStreamControllerError(this, ex);
}
}
private static unsafe void ReadableStreamControllerEnqueueUnsafe(object pullState, byte[] buffer, int length)
{
fixed (byte* ptr = buffer)
{
BrowserHttpInterop.ReadableStreamControllerEnqueue(pullState, (nint)ptr, length);
}
}
}
internal sealed class WasmFetchResponse : IDisposable
{
#if FEATURE_WASM_THREADS
......
......@@ -11,6 +11,9 @@ namespace System.Net.Http
{
internal static partial class BrowserHttpInterop
{
[JSImport("INTERNAL.http_wasm_supports_streaming_request")]
public static partial bool SupportsStreamingRequest();
[JSImport("INTERNAL.http_wasm_supports_streaming_response")]
public static partial bool SupportsStreamingResponse();
......@@ -25,6 +28,17 @@ internal static partial class BrowserHttpInterop
public static partial void AbortResponse(
JSObject fetchResponse);
[JSImport("INTERNAL.http_wasm_readable_stream_controller_enqueue")]
public static partial void ReadableStreamControllerEnqueue(
[JSMarshalAs<JSType.Any>] object pullState,
IntPtr bufferPtr,
int bufferLength);
[JSImport("INTERNAL.http_wasm_readable_stream_controller_error")]
public static partial void ReadableStreamControllerError(
[JSMarshalAs<JSType.Any>] object pullState,
Exception error);
[JSImport("INTERNAL.http_wasm_get_response_header_names")]
private static partial string[] _GetResponseHeaderNames(
JSObject fetchResponse);
......@@ -58,6 +72,17 @@ public static void GetResponseHeaders(JSObject fetchResponse, HttpHeaders respos
JSObject abortControler,
string? body = null);
[JSImport("INTERNAL.http_wasm_fetch_stream")]
public static partial Task<JSObject> Fetch(
string uri,
string[] headerNames,
string[] headerValues,
string[] optionNames,
[JSMarshalAs<JSType.Array<JSType.Any>>] object?[] optionValues,
JSObject abortControler,
[JSMarshalAs<JSType.Function<JSType.Any>>] Action<object> pull,
[JSMarshalAs<JSType.Any>] object pullState);
[JSImport("INTERNAL.http_wasm_fetch_bytes")]
private static partial Task<JSObject> FetchBytes(
string uri,
......@@ -67,8 +92,7 @@ public static void GetResponseHeaders(JSObject fetchResponse, HttpHeaders respos
[JSMarshalAs<JSType.Array<JSType.Any>>] object?[] optionValues,
JSObject abortControler,
IntPtr bodyPtr,
int bodyLength
);
int bodyLength);
public static unsafe Task<JSObject> Fetch(string uri, string[] headerNames, string[] headerValues, string[] optionNames, object?[] optionValues, JSObject abortControler, byte[] body)
{
......
......@@ -4,7 +4,7 @@
import { mono_wasm_cancel_promise } from "./cancelable-promise";
import cwraps, { profiler_c_functions } from "./cwraps";
import { mono_wasm_send_dbg_command_with_parms, mono_wasm_send_dbg_command, mono_wasm_get_dbg_command_info, mono_wasm_get_details, mono_wasm_release_object, mono_wasm_call_function_on, mono_wasm_debugger_resume, mono_wasm_detach_debugger, mono_wasm_raise_debug_event, mono_wasm_change_debugger_log_level, mono_wasm_debugger_attached } from "./debug";
import { http_wasm_supports_streaming_response, http_wasm_create_abort_controler, http_wasm_abort_request, http_wasm_abort_response, http_wasm_fetch, http_wasm_fetch_bytes, http_wasm_get_response_header_names, http_wasm_get_response_header_values, http_wasm_get_response_bytes, http_wasm_get_response_length, http_wasm_get_streamed_response_bytes } from "./http";
import { http_wasm_supports_streaming_request, http_wasm_supports_streaming_response, http_wasm_create_abort_controler, http_wasm_abort_request, http_wasm_abort_response, http_wasm_readable_stream_controller_enqueue, http_wasm_readable_stream_controller_error, http_wasm_fetch, http_wasm_fetch_stream, http_wasm_fetch_bytes, http_wasm_get_response_header_names, http_wasm_get_response_header_values, http_wasm_get_response_bytes, http_wasm_get_response_length, http_wasm_get_streamed_response_bytes } from "./http";
import { exportedRuntimeAPI, Module, runtimeHelpers } from "./globals";
import { get_property, set_property, has_property, get_typeof_property, get_global_this, dynamic_import } from "./invoke-js";
import { mono_wasm_stringify_as_error_with_stack } from "./logging";
......@@ -64,11 +64,15 @@ export function export_internal(): any {
ws_wasm_abort,
// BrowserHttpHandler
http_wasm_supports_streaming_request,
http_wasm_supports_streaming_response,
http_wasm_create_abort_controler,
http_wasm_abort_request,
http_wasm_abort_response,
http_wasm_readable_stream_controller_enqueue,
http_wasm_readable_stream_controller_error,
http_wasm_fetch,
http_wasm_fetch_stream,
http_wasm_fetch_bytes,
http_wasm_get_response_header_names,
http_wasm_get_response_header_values,
......
......@@ -2,9 +2,10 @@
// The .NET Foundation licenses this file to you under the MIT license.
import { wrap_as_cancelable_promise } from "./cancelable-promise";
import { ENVIRONMENT_IS_NODE, Module, loaderHelpers, mono_assert } from "./globals";
import { MemoryViewType, Span } from "./marshal";
import { ENVIRONMENT_IS_NODE, Module, createPromiseController, loaderHelpers, mono_assert } from "./globals";
import { ManagedObject, MemoryViewType, Span } from "./marshal";
import type { VoidPtr } from "./types/emscripten";
import { ControllablePromise, PromiseController } from "./types/internal";
function verifyEnvironment() {
......@@ -16,6 +17,29 @@ function verifyEnvironment() {
}
}
export function http_wasm_supports_streaming_request(): boolean {
// Detecting streaming request support works like this:
// If the browser doesn't support a particular body type, it calls toString() on the object and uses the result as the body.
// So, if the browser doesn't support request streams, the request body becomes the string "[object ReadableStream]".
// When a string is used as a body, it conveniently sets the Content-Type header to text/plain;charset=UTF-8.
// So, if that header is set, then we know the browser doesn't support streams in request objects, and we can exit early.
// Safari does support streams in request objects, but doesn't allow them to be used with fetch, so the duplex option is tested, which Safari doesn't currently support.
// See https://developer.chrome.com/articles/fetch-streaming-requests/
if (typeof Request !== "undefined" && "body" in Request.prototype && typeof ReadableStream === "function") {
let duplexAccessed = false;
const hasContentType = new Request("", {
body: new ReadableStream(),
method: "POST",
get duplex() {
duplexAccessed = true;
return "half";
},
} as RequestInit /* https://github.com/microsoft/TypeScript-DOM-lib-generator/issues/1483 */).headers.has("Content-Type");
return duplexAccessed && !hasContentType;
}
return false;
}
export function http_wasm_supports_streaming_response(): boolean {
return typeof Response !== "undefined" && "body" in Response.prototype && typeof ReadableStream === "function";
}
......@@ -41,14 +65,90 @@ export function http_wasm_abort_response(res: ResponseExtension): void {
}
}
export function http_wasm_readable_stream_controller_enqueue(pull_state: PullStateExtension, bufferPtr: VoidPtr, bufferLength: number): void {
const controller = pull_state.__controller;
const pull_promise_control = pull_state.__pull_promise_control;
mono_assert(controller, "expected controller");
mono_assert(pull_promise_control, "expected pull_promise_control");
try {
if (bufferLength === 0) {
controller.close();
pull_state.dispose();
pull_state.__pull_promise_control = null;
pull_state.__controller = null;
} else {
// the bufferPtr is pinned by the caller
const view = new Span(bufferPtr, bufferLength, MemoryViewType.Byte);
// because https://github.com/WebAssembly/design/issues/1162 we need to copy the buffer
// also it doesn't make much sense to use byob
const copy = view.slice() as Uint8Array;
controller.enqueue(copy);
}
pull_promise_control.resolve();
}
catch (err) {
pull_state.dispose();
pull_promise_control.reject(err);
}
finally {
pull_state.__pull_promise_control = null;
}
}
export function http_wasm_readable_stream_controller_error(pull_state: PullStateExtension, error: Error): void {
const controller = pull_state.__controller;
mono_assert(controller, "expected controller");
pull_state.__pull_promise_control?.reject(error);
pull_state.__fetch_promise_control?.reject(error);
pull_state.dispose();
controller.error(error);
pull_state.__pull_promise_control = null;
}
export function http_wasm_fetch_stream(url: string, header_names: string[], header_values: string[], option_names: string[], option_values: any[], abort_controller: AbortController,
pull_delegate: (pull_state: PullStateExtension) => void,
pull_state: PullStateExtension): Promise<ResponseExtension> {
function pull(controller: ReadableByteStreamController): Promise<void> {
const { promise, promise_control } = createPromiseController<void>();
try {
mono_assert(!pull_state.__pull_promise_control, "expected pull_promise_control to be null");
pull_state.__controller = controller;
pull_state.__pull_promise_control = promise_control;
pull_delegate(pull_state);
return promise;
}
catch (error) {
pull_state.dispose();
pull_state.__controller = null;
pull_state.__pull_promise_control = null;
pull_state.__fetch_promise_control?.reject(error);
return Promise.reject(error);
}
}
function cancel(error: any) {
pull_state.__fetch_promise_control?.reject(error);
}
const body = new ReadableStream({
type: "bytes",
pull,
cancel
});
const cancelable_promise = http_wasm_fetch(url, header_names, header_values, option_names, option_values, abort_controller, body);
pull_state.__fetch_promise_control = loaderHelpers.getPromiseController(cancelable_promise);
return cancelable_promise;
}
export function http_wasm_fetch_bytes(url: string, header_names: string[], header_values: string[], option_names: string[], option_values: any[], abort_controller: AbortController, bodyPtr: VoidPtr, bodyLength: number): Promise<ResponseExtension> {
// the bufferPtr is pinned by the caller
// the bodyPtr is pinned by the caller
const view = new Span(bodyPtr, bodyLength, MemoryViewType.Byte);
const copy = view.slice() as Uint8Array;
return http_wasm_fetch(url, header_names, header_values, option_names, option_values, abort_controller, copy);
}
export function http_wasm_fetch(url: string, header_names: string[], header_values: string[], option_names: string[], option_values: any[], abort_controller: AbortController, body: string | Uint8Array | null): Promise<ResponseExtension> {
export function http_wasm_fetch(url: string, header_names: string[], header_values: string[], option_names: string[], option_values: any[], abort_controller: AbortController, body: string | Uint8Array | ReadableStream | null): ControllablePromise<ResponseExtension> {
verifyEnvironment();
mono_assert(url && typeof url === "string", "expected url string");
mono_assert(header_names && header_values && Array.isArray(header_names) && Array.isArray(header_values) && header_names.length === header_values.length, "expected headerNames and headerValues arrays");
......@@ -62,6 +162,9 @@ export function http_wasm_fetch(url: string, header_names: string[], header_valu
headers,
signal: abort_controller.signal
};
if (typeof ReadableStream !== "undefined" && body instanceof ReadableStream) {
options.duplex = "half";
}
for (let i = 0; i < option_names.length; i++) {
options[option_names[i]] = option_values[i];
}
......@@ -149,6 +252,12 @@ export function http_wasm_get_streamed_response_bytes(res: ResponseExtension, bu
});
}
interface PullStateExtension extends ManagedObject {
__pull_promise_control: PromiseController<void> | null
__fetch_promise_control: PromiseController<ResponseExtension> | null
__controller: ReadableByteStreamController | null
}
interface ResponseExtension extends Response {
__buffer?: ArrayBuffer
__reader?: ReadableStreamDefaultReader<Uint8Array>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册