未验证 提交 bb971149 编写于 作者: P Pavel Savara 提交者: GitHub

[wasm] crypto deadlock fix (#73537)

* more state locking and sanity
* renamed the subtle-crypto.ts file, because that's not a worker
Co-authored-by: NAnkit Jain <radical@gmail.com>
上级 2594ec1b
import MonoWasmThreads from "consts:monoWasmThreads";
import { dotnet_browser_can_use_subtle_crypto_impl, dotnet_browser_simple_digest_hash, dotnet_browser_sign, dotnet_browser_encrypt_decrypt, dotnet_browser_derive_bits } from "./crypto-worker";
import { dotnet_browser_can_use_subtle_crypto_impl, dotnet_browser_simple_digest_hash, dotnet_browser_sign, dotnet_browser_encrypt_decrypt, dotnet_browser_derive_bits } from "./subtle-crypto";
import { mono_wasm_fire_debugger_agent_message, mono_wasm_debugger_log, mono_wasm_add_dbg_command_received, mono_wasm_set_entrypoint_breakpoint } from "./debug";
import { mono_wasm_release_cs_owned_object } from "./gc-handles";
import { mono_wasm_load_icu_data, mono_wasm_get_icudt_name } from "./icu";
......
......@@ -13,7 +13,7 @@ import { mono_wasm_init_aot_profiler, mono_wasm_init_coverage_profiler } from ".
import { mono_on_abort, set_exit_code } from "./run";
import { initialize_marshalers_to_cs } from "./marshal-to-cs";
import { initialize_marshalers_to_js } from "./marshal-to-js";
import { init_crypto } from "./crypto-worker";
import { init_crypto } from "./subtle-crypto";
import { init_polyfills_async } from "./polyfills";
import * as pthreads_worker from "./pthreads/worker";
import { createPromiseController } from "./promise-controller";
......
......@@ -207,22 +207,13 @@ class LibraryChannel {
public send_msg(msg: string): string {
try {
let state = Atomics.load(this.comm, this.STATE_IDX);
// FIXME: this console write is possibly serializing the access and prevents a deadlock
if (state !== this.STATE_IDLE) console.debug(`MONO_WASM_ENCRYPT_DECRYPT: send_msg, waiting for idle now, ${state}`);
state = this.wait_for_state(pstate => pstate == this.STATE_IDLE, "waiting");
this.wait_for_state_change_to(pstate => pstate == this.STATE_IDLE, "waiting");
this.send_request(msg);
return this.read_response();
} catch (err) {
this.reset(LibraryChannel._stringify_err(err));
this.reset(LibraryChannel.stringify_err(err));
throw err;
}
finally {
const state = Atomics.load(this.comm, this.STATE_IDX);
// FIXME: this console write is possibly serializing the access and prevents a deadlock
if (state !== this.STATE_IDLE) console.debug(`MONO_WASM_ENCRYPT_DECRYPT: state at end of send_msg: ${state}`);
}
}
public shutdown(): void {
......@@ -231,9 +222,11 @@ class LibraryChannel {
if (state !== this.STATE_IDLE)
throw new Error(`OWNER: Invalid sync communication channel state: ${state}`);
this.using_lock(() => {
Atomics.store(this.comm, this.MSG_SIZE_IDX, 0);
this.change_state_locked(this.STATE_SHUTDOWN);
});
// Notify webworker
Atomics.store(this.comm, this.MSG_SIZE_IDX, 0);
this._change_state_locked(this.STATE_SHUTDOWN);
Atomics.notify(this.comm, this.STATE_IDX);
}
......@@ -248,8 +241,11 @@ class LibraryChannel {
return;
}
Atomics.store(this.comm, this.MSG_SIZE_IDX, 0);
this._change_state_locked(this.STATE_RESET);
this.using_lock(() => {
Atomics.store(this.comm, this.MSG_SIZE_IDX, 0);
this.change_state_locked(this.STATE_RESET);
});
// Notify webworker
Atomics.notify(this.comm, this.STATE_IDX);
}
......@@ -259,9 +255,7 @@ class LibraryChannel {
let msg_written = 0;
for (; ;) {
this.acquire_lock();
try {
this.using_lock(() => {
// Write the message and return how much was written.
const wrote = this.write_to_msg(msg, msg_written, msg_len);
msg_written += wrote;
......@@ -272,19 +266,20 @@ class LibraryChannel {
// Indicate if this was the whole message or part of it.
state = msg_written === msg_len ? this.STATE_REQ : this.STATE_REQ_P;
// Notify webworker
this._change_state_locked(state);
} finally {
this.release_lock();
}
this.change_state_locked(state);
});
// Notify webworker
Atomics.notify(this.comm, this.STATE_IDX);
// The send message is complete.
if (state === this.STATE_REQ)
if (state === this.STATE_REQ) {
break;
}
else if (state !== this.STATE_REQ_P) {
throw new Error(`Unexpected state ${state}`);
}
this.wait_for_state(state => state == this.STATE_AWAIT, "send_request");
this.wait_for_state_change_to(state => state == this.STATE_AWAIT, "send_request");
}
}
......@@ -302,56 +297,63 @@ class LibraryChannel {
private read_response(): string {
let response = "";
for (; ;) {
const state = this.wait_for_state(state => state == this.STATE_RESP || state == this.STATE_RESP_P, "read_response");
this.acquire_lock();
try {
this.wait_for_state_change_to(state => state == this.STATE_RESP || state == this.STATE_RESP_P, "read_response");
const done = this.using_lock(() => {
const size_to_read = Atomics.load(this.comm, this.MSG_SIZE_IDX);
// Append the latest part of the message.
response += this.read_from_msg(0, size_to_read);
// The response is complete.
const state = Atomics.load(this.comm, this.STATE_IDX);
if (state === this.STATE_RESP) {
Atomics.store(this.comm, this.MSG_SIZE_IDX, 0);
break;
return true;
} else if (state !== this.STATE_RESP_P) {
throw new Error(`Unexpected state ${state}`);
}
// Reset the size and transition to await state.
Atomics.store(this.comm, this.MSG_SIZE_IDX, 0);
this._change_state_locked(this.STATE_AWAIT);
} finally {
this.release_lock();
}
this.change_state_locked(this.STATE_AWAIT);
return false;
});
// Notify webworker
Atomics.notify(this.comm, this.STATE_IDX);
if (done) {
break;
}
}
// Reset the communication channel's state and let the
// webworker know we are done.
this._change_state_locked(this.STATE_IDLE);
this.using_lock(() => {
this.change_state_locked(this.STATE_IDLE);
});
Atomics.notify(this.comm, this.STATE_IDX);
return response;
}
private _change_state_locked(newState: number): void {
private change_state_locked(newState: number): void {
Atomics.store(this.comm, this.STATE_IDX, newState);
}
private wait_for_state(is_ready: (state: number) => boolean, msg: string): number {
private wait_for_state_change_to(is_ready: (state: number) => boolean, msg: string): void {
// Wait for webworker
// - Atomics.wait() is not permissible on the main thread.
for (; ;) {
const lock_state = Atomics.load(this.comm, this.LOCK_IDX);
if (lock_state !== this.LOCK_UNLOCKED)
continue;
const state = Atomics.load(this.comm, this.STATE_IDX);
if (state == this.STATE_REQ_FAILED)
throw new OperationFailedError(`Worker failed during ${msg} with state=${state}`);
const done = this.using_lock(() => {
const state = Atomics.load(this.comm, this.STATE_IDX);
if (state == this.STATE_REQ_FAILED)
throw new OperationFailedError(`Worker failed during ${msg} with state=${state}`);
if (is_ready(state))
return state;
if (is_ready(state))
return true;
});
if (done) return;
}
}
......@@ -361,6 +363,15 @@ class LibraryChannel {
return String.fromCharCode.apply(null, slicedMessage);
}
private using_lock(callback: Function) {
try {
this.acquire_lock();
return callback();
} finally {
this.release_lock();
}
}
private acquire_lock() {
for (; ;) {
const lock_state = Atomics.compareExchange(this.comm, this.LOCK_IDX, this.LOCK_UNLOCKED, this.LOCK_OWNED);
......@@ -381,7 +392,7 @@ class LibraryChannel {
}
}
private static _stringify_err(err: any) {
private static stringify_err(err: any) {
return (err instanceof Error && err.stack !== undefined) ? err.stack : err;
}
......
......@@ -2,7 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.
import { setup_proxy_console } from "../logging";
import type { InitCryptoMessageData } from "../crypto-worker";
import type { InitCryptoMessageData } from "../subtle-crypto";
import type { MonoConfig } from "../types";
class FailedOrStoppedLoopError extends Error { }
......@@ -47,14 +47,14 @@ class ChannelWorker {
// Wait for signal to perform operation
let state;
do {
this._wait(this.STATE_IDLE);
this.wait_for_state_to_change_from(this.STATE_IDLE);
state = Atomics.load(this.comm, this.STATE_IDX);
} while (state !== this.STATE_REQ && state !== this.STATE_REQ_P && state !== this.STATE_SHUTDOWN && state !== this.STATE_REQ_FAILED && state !== this.STATE_RESET);
this._throw_if_reset_or_shutdown();
this.throw_if_reset_or_shutdown();
// Read in request
const request_json = this._read_request();
const request_json = this.read_request();
const response: any = {};
try {
// Perform async action based on request
......@@ -67,7 +67,7 @@ class ChannelWorker {
}
// Send response
this._send_response(JSON.stringify(response));
this.send_response(JSON.stringify(response));
} catch (err) {
if (err instanceof FailedOrStoppedLoopError) {
const state = Atomics.load(this.comm, this.STATE_IDX);
......@@ -77,8 +77,9 @@ class ChannelWorker {
console.debug("MONO_WASM: caller failed, resetting worker");
} else {
console.error(`MONO_WASM: Worker failed to handle the request: ${_stringify_err(err)}`);
this._change_state_locked(this.STATE_REQ_FAILED);
Atomics.store(this.comm, this.LOCK_IDX, this.LOCK_UNLOCKED);
this.using_lock(() => {
this.change_state_locked(this.STATE_REQ_FAILED);
});
console.debug("MONO_WASM: set state to failed, now waiting to get RESET");
Atomics.wait(this.comm, this.STATE_IDX, this.STATE_REQ_FAILED);
......@@ -88,9 +89,11 @@ class ChannelWorker {
}
}
Atomics.store(this.comm, this.MSG_SIZE_IDX, 0);
Atomics.store(this.comm, this.LOCK_IDX, this.LOCK_UNLOCKED);
this._change_state_locked(this.STATE_IDLE);
this.using_lock(() => {
Atomics.store(this.comm, this.MSG_SIZE_IDX, 0);
Atomics.store(this.comm, this.LOCK_IDX, this.LOCK_UNLOCKED);
this.change_state_locked(this.STATE_IDLE);
});
}
const state = Atomics.load(this.comm, this.STATE_IDX);
......@@ -102,17 +105,18 @@ class ChannelWorker {
console.error(`MONO_WASM: -- lock is not unlocked at the top of the loop: ${lock_state}, and state: ${state}`);
}
Atomics.store(this.comm, this.MSG_SIZE_IDX, 0);
this._change_state_locked(this.STATE_SHUTDOWN);
this.using_lock(() => {
Atomics.store(this.comm, this.MSG_SIZE_IDX, 0);
this.change_state_locked(this.STATE_SHUTDOWN);
});
console.debug("MONO_WASM: ******* run_message_loop ending");
}
_read_request(): string {
private read_request(): string {
let request = "";
for (; ;) {
this._acquire_lock();
try {
this._throw_if_reset_or_shutdown();
const done = this.using_lock(() => {
this.throw_if_reset_or_shutdown();
// Get the current state and message size
const state = Atomics.load(this.comm, this.STATE_IDX);
......@@ -125,64 +129,67 @@ class ChannelWorker {
// The request is complete.
if (state === this.STATE_REQ) {
break;
return true;
} else if (state !== this.STATE_REQ_P) {
throw new Error(`Unexpected state ${state}`);
}
// Shutdown the worker.
this._throw_if_reset_or_shutdown();
this.throw_if_reset_or_shutdown();
// Reset the size and transition to await state.
Atomics.store(this.comm, this.MSG_SIZE_IDX, 0);
this._change_state_locked(this.STATE_AWAIT);
} finally {
this._release_lock();
this.change_state_locked(this.STATE_AWAIT);
});
if (done) {
break;
}
this._wait(this.STATE_AWAIT);
this.wait_for_state_to_change_from(this.STATE_AWAIT);
}
return request;
}
_send_response(msg: string) {
private send_response(msg: string) {
if (Atomics.load(this.comm, this.STATE_IDX) !== this.STATE_REQ)
throw new WorkerFailedError("WORKER: Invalid sync communication channel state.");
let state; // State machine variable
const msg_len = msg.length;
let msg_written = 0;
for (; ;) {
this._acquire_lock();
try {
const state = this.using_lock(() => {
// Write the message and return how much was written.
const wrote = this._write_to_msg(msg, msg_written, msg_len);
const wrote = this.write_to_msg(msg, msg_written, msg_len);
msg_written += wrote;
// Indicate how much was written to the this.msg buffer.
Atomics.store(this.comm, this.MSG_SIZE_IDX, wrote);
// Indicate if this was the whole message or part of it.
state = msg_written === msg_len ? this.STATE_RESP : this.STATE_RESP_P;
const state = msg_written === msg_len ? this.STATE_RESP : this.STATE_RESP_P;
// Update the state
this._change_state_locked(state);
} finally {
this._release_lock();
}
this.change_state_locked(state);
return state;
});
// Wait for the transition to know the main thread has
// received the response by moving onto a new state.
this._wait(state);
this.wait_for_state_to_change_from(state);
// Done sending response.
if (state === this.STATE_RESP)
if (state === this.STATE_RESP) {
break;
} else if (state !== this.STATE_RESP_P) {
throw new Error(`Unexpected state ${state}`);
}
}
}
_write_to_msg(input: string, start: number, input_len: number) {
private write_to_msg(input: string, start: number, input_len: number) {
let mi = 0;
let ii = start;
while (mi < this.msg_char_len && ii < input_len) {
......@@ -193,33 +200,42 @@ class ChannelWorker {
return ii - start;
}
_change_state_locked(newState: number) {
private change_state_locked(newState: number) {
Atomics.store(this.comm, this.STATE_IDX, newState);
}
_acquire_lock() {
private using_lock(callback: Function) {
try {
this.acquire_lock();
return callback();
} finally {
this.release_lock();
}
}
private acquire_lock() {
for (; ;) {
const lockState = Atomics.compareExchange(this.comm, this.LOCK_IDX, this.LOCK_UNLOCKED, this.LOCK_OWNED);
this._throw_if_reset_or_shutdown();
this.throw_if_reset_or_shutdown();
if (lockState === this.LOCK_UNLOCKED)
return;
}
}
_release_lock() {
private release_lock() {
const result = Atomics.compareExchange(this.comm, this.LOCK_IDX, this.LOCK_OWNED, this.LOCK_UNLOCKED);
if (result !== this.LOCK_OWNED) {
throw new WorkerFailedError("CRYPTO: ChannelWorker tried to release a lock that wasn't acquired: " + result);
}
}
_wait(expected_state: number) {
private wait_for_state_to_change_from(expected_state: number) {
Atomics.wait(this.comm, this.STATE_IDX, expected_state);
this._throw_if_reset_or_shutdown();
this.throw_if_reset_or_shutdown();
}
_throw_if_reset_or_shutdown() {
private throw_if_reset_or_shutdown() {
const state = Atomics.load(this.comm, this.STATE_IDX);
if (state === this.STATE_RESET || state === this.STATE_SHUTDOWN)
throw new FailedOrStoppedLoopError();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册