diff --git a/cli/compilers/js.rs b/cli/compilers/js.rs index 56c9b672e68031d2d268a5654d5058bfa1ff773c..af79690d649807b6d10afd3ccbca17fb8009fb61 100644 --- a/cli/compilers/js.rs +++ b/cli/compilers/js.rs @@ -2,7 +2,6 @@ use crate::compilers::CompiledModule; use crate::compilers::CompiledModuleFuture; use crate::file_fetcher::SourceFile; -use crate::state::ThreadSafeState; use std::str; pub struct JsCompiler {} @@ -10,7 +9,6 @@ pub struct JsCompiler {} impl JsCompiler { pub fn compile_async( self: &Self, - _state: ThreadSafeState, source_file: &SourceFile, ) -> Box { let module = CompiledModule { diff --git a/cli/compilers/json.rs b/cli/compilers/json.rs index e61b3864942d73482834cb7295929d3305c9acb9..22a1d5f3d102301c8e72550e9a3f6661c1d0fc08 100644 --- a/cli/compilers/json.rs +++ b/cli/compilers/json.rs @@ -2,7 +2,6 @@ use crate::compilers::CompiledModule; use crate::compilers::CompiledModuleFuture; use crate::file_fetcher::SourceFile; -use crate::state::ThreadSafeState; use deno::ErrBox; use regex::Regex; use std::str; @@ -15,7 +14,6 @@ pub struct JsonCompiler {} impl JsonCompiler { pub fn compile_async( self: &Self, - _state: ThreadSafeState, source_file: &SourceFile, ) -> Box { let maybe_json_value: serde_json::Result = diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs index c2a4ccc4a7c405ac06ecdcc91ee04a08efcf85b8..3c36f14c4c202217d191d2935a7d4ff6846b02cf 100644 --- a/cli/compilers/ts.rs +++ b/cli/compilers/ts.rs @@ -5,8 +5,8 @@ use crate::diagnostics::Diagnostic; use crate::disk_cache::DiskCache; use crate::file_fetcher::SourceFile; use crate::file_fetcher::SourceFileFetcher; +use crate::global_state::ThreadSafeGlobalState; use crate::msg; -use crate::resources; use crate::source_maps::SourceMapGetter; use crate::startup_data; use crate::state::*; @@ -16,7 +16,7 @@ use deno::Buf; use deno::ErrBox; use deno::ModuleSpecifier; use futures::Future; -use futures::Stream; +use futures::IntoFuture; use regex::Regex; use std::collections::HashSet; use std::fs; @@ -222,16 +222,20 @@ impl TsCompiler { } /// Create a new V8 worker with snapshot of TS compiler and setup compiler's runtime. - fn setup_worker(state: ThreadSafeState) -> Worker { + fn setup_worker(global_state: ThreadSafeGlobalState) -> Worker { + let worker_state = ThreadSafeState::new(global_state.clone(), None, true) + .expect("Unable to create worker state"); + // Count how many times we start the compiler worker. - state.metrics.compiler_starts.fetch_add(1, Ordering::SeqCst); + global_state + .metrics + .compiler_starts + .fetch_add(1, Ordering::SeqCst); let mut worker = Worker::new( "TS".to_string(), startup_data::compiler_isolate_init(), - // TODO(ry) Maybe we should use a separate state for the compiler. - // as was done previously. - state.clone(), + worker_state, ); worker.execute("denoMain()").unwrap(); worker.execute("workerMain()").unwrap(); @@ -241,7 +245,7 @@ impl TsCompiler { pub fn bundle_async( self: &Self, - state: ThreadSafeState, + global_state: ThreadSafeGlobalState, module_name: String, out_file: String, ) -> impl Future { @@ -253,25 +257,21 @@ impl TsCompiler { let root_names = vec![module_name.clone()]; let req_msg = req(root_names, self.config.clone(), Some(out_file)); - let worker = TsCompiler::setup_worker(state.clone()); - let resource = worker.state.resource.clone(); - let compiler_rid = resource.rid; - let first_msg_fut = - resources::post_message_to_worker(compiler_rid, req_msg) - .expect("Bad compiler rid") - .then(move |_| worker) - .then(move |result| { - if let Err(err) = result { - // TODO(ry) Need to forward the error instead of exiting. - eprintln!("{}", err.to_string()); - std::process::exit(1); - } - debug!("Sent message to worker"); - let stream_future = - resources::get_message_stream_from_worker(compiler_rid) - .into_future(); - stream_future.map(|(f, _rest)| f).map_err(|(f, _rest)| f) - }); + let worker = TsCompiler::setup_worker(global_state.clone()); + let worker_ = worker.clone(); + let first_msg_fut = worker + .post_message(req_msg) + .into_future() + .then(move |_| worker) + .then(move |result| { + if let Err(err) = result { + // TODO(ry) Need to forward the error instead of exiting. + eprintln!("{}", err.to_string()); + std::process::exit(1); + } + debug!("Sent message to worker"); + worker_.get_message() + }); first_msg_fut.map_err(|_| panic!("not handled")).and_then( move |maybe_msg: Option| { @@ -312,7 +312,7 @@ impl TsCompiler { /// If compilation is required then new V8 worker is spawned with fresh TS compiler. pub fn compile_async( self: &Self, - state: ThreadSafeState, + global_state: ThreadSafeGlobalState, source_file: &SourceFile, ) -> Box { if self.has_compiled(&source_file.url) { @@ -359,28 +359,26 @@ impl TsCompiler { let root_names = vec![module_url.to_string()]; let req_msg = req(root_names, self.config.clone(), None); - let worker = TsCompiler::setup_worker(state.clone()); - let compiling_job = state.progress.add("Compile", &module_url.to_string()); - let state_ = state.clone(); - - let resource = worker.state.resource.clone(); - let compiler_rid = resource.rid; - let first_msg_fut = - resources::post_message_to_worker(compiler_rid, req_msg) - .expect("Bad compiler rid") - .then(move |_| worker) - .then(move |result| { - if let Err(err) = result { - // TODO(ry) Need to forward the error instead of exiting. - eprintln!("{}", err.to_string()); - std::process::exit(1); - } - debug!("Sent message to worker"); - let stream_future = - resources::get_message_stream_from_worker(compiler_rid) - .into_future(); - stream_future.map(|(f, _rest)| f).map_err(|(f, _rest)| f) - }); + let worker = TsCompiler::setup_worker(global_state.clone()); + let worker_ = worker.clone(); + let compiling_job = global_state + .progress + .add("Compile", &module_url.to_string()); + let global_state_ = global_state.clone(); + + let first_msg_fut = worker + .post_message(req_msg) + .into_future() + .then(move |_| worker) + .then(move |result| { + if let Err(err) = result { + // TODO(ry) Need to forward the error instead of exiting. + eprintln!("{}", err.to_string()); + std::process::exit(1); + } + debug!("Sent message to worker"); + worker_.get_message() + }); let fut = first_msg_fut .map_err(|_| panic!("not handled")) @@ -400,7 +398,7 @@ impl TsCompiler { .and_then(move |_| { // if we are this far it means compilation was successful and we can // load compiled filed from disk - state_ + global_state_ .ts_compiler .get_compiled_module(&source_file_.url) .map_err(|e| { @@ -663,7 +661,7 @@ mod tests { source_code: include_bytes!("../tests/002_hello.ts").to_vec(), }; - let mock_state = ThreadSafeState::mock(vec![ + let mock_state = ThreadSafeGlobalState::mock(vec![ String::from("deno"), String::from("hello.js"), ]); @@ -696,7 +694,7 @@ mod tests { .unwrap() .to_string(); - let state = ThreadSafeState::mock(vec![ + let state = ThreadSafeGlobalState::mock(vec![ String::from("deno"), p.to_string_lossy().into(), String::from("$deno$/bundle.js"), diff --git a/cli/global_state.rs b/cli/global_state.rs new file mode 100644 index 0000000000000000000000000000000000000000..3e102cb4ee4b85c10677ca317533c3078f45a9e3 --- /dev/null +++ b/cli/global_state.rs @@ -0,0 +1,247 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use crate::compilers::CompiledModule; +use crate::compilers::JsCompiler; +use crate::compilers::JsonCompiler; +use crate::compilers::TsCompiler; +use crate::deno_dir; +use crate::deno_error::permission_denied; +use crate::file_fetcher::SourceFileFetcher; +use crate::flags; +use crate::lockfile::Lockfile; +use crate::metrics::Metrics; +use crate::msg; +use crate::permissions::DenoPermissions; +use crate::progress::Progress; +use deno::ErrBox; +use deno::ModuleSpecifier; +use futures::Future; +use std; +use std::env; +use std::ops::Deref; +use std::str; +use std::sync::Arc; +use std::sync::Mutex; + +/// Holds state of the program and can be accessed by V8 isolate. +pub struct ThreadSafeGlobalState(Arc); + +/// This structure represents state of single "deno" program. +/// +/// It is shared by all created workers (thus V8 isolates). +#[cfg_attr(feature = "cargo-clippy", allow(stutter))] +pub struct GlobalState { + /// Vector of CLI arguments - these are user script arguments, all Deno specific flags are removed. + pub argv: Vec, + /// Flags parsed from `argv` contents. + pub flags: flags::DenoFlags, + /// Entry script parsed from CLI arguments. + pub main_module: Option, + /// Permissions parsed from `flags`. + pub permissions: DenoPermissions, + pub dir: deno_dir::DenoDir, + pub metrics: Metrics, + pub progress: Progress, + pub file_fetcher: SourceFileFetcher, + pub js_compiler: JsCompiler, + pub json_compiler: JsonCompiler, + pub ts_compiler: TsCompiler, + pub lockfile: Option>, +} + +impl Clone for ThreadSafeGlobalState { + fn clone(&self) -> Self { + ThreadSafeGlobalState(self.0.clone()) + } +} + +impl Deref for ThreadSafeGlobalState { + type Target = Arc; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl ThreadSafeGlobalState { + pub fn new( + flags: flags::DenoFlags, + argv_rest: Vec, + progress: Progress, + ) -> Result { + let custom_root = env::var("DENO_DIR").map(String::into).ok(); + let dir = deno_dir::DenoDir::new(custom_root)?; + + let file_fetcher = SourceFileFetcher::new( + dir.deps_cache.clone(), + progress.clone(), + !flags.reload, + flags.cache_blacklist.clone(), + flags.no_fetch, + )?; + + let ts_compiler = TsCompiler::new( + file_fetcher.clone(), + dir.gen_cache.clone(), + !flags.reload, + flags.config_path.clone(), + )?; + + let main_module: Option = if argv_rest.len() <= 1 { + None + } else { + let root_specifier = argv_rest[1].clone(); + Some(ModuleSpecifier::resolve_url_or_path(&root_specifier)?) + }; + + // Note: reads lazily from disk on first call to lockfile.check() + let lockfile = if let Some(filename) = &flags.lock { + Some(Mutex::new(Lockfile::new(filename.to_string()))) + } else { + None + }; + + let state = GlobalState { + main_module, + dir, + argv: argv_rest, + permissions: DenoPermissions::from_flags(&flags), + flags, + metrics: Metrics::default(), + progress, + file_fetcher, + ts_compiler, + js_compiler: JsCompiler {}, + json_compiler: JsonCompiler {}, + lockfile, + }; + + Ok(ThreadSafeGlobalState(Arc::new(state))) + } + + pub fn fetch_compiled_module( + self: &Self, + module_specifier: &ModuleSpecifier, + ) -> impl Future { + let state1 = self.clone(); + let state2 = self.clone(); + + self + .file_fetcher + .fetch_source_file_async(&module_specifier) + .and_then(move |out| match out.media_type { + msg::MediaType::Unknown => state1.js_compiler.compile_async(&out), + msg::MediaType::Json => state1.json_compiler.compile_async(&out), + msg::MediaType::TypeScript + | msg::MediaType::TSX + | msg::MediaType::JSX => { + state1.ts_compiler.compile_async(state1.clone(), &out) + } + msg::MediaType::JavaScript => { + if state1.ts_compiler.compile_js { + state1.ts_compiler.compile_async(state1.clone(), &out) + } else { + state1.js_compiler.compile_async(&out) + } + } + }) + .and_then(move |compiled_module| { + if let Some(ref lockfile) = state2.lockfile { + let mut g = lockfile.lock().unwrap(); + if state2.flags.lock_write { + g.insert(&compiled_module); + } else if !g.check(&compiled_module)? { + eprintln!( + "Subresource integrety check failed --lock={}\n{}", + g.filename, compiled_module.name + ); + std::process::exit(10); + } + } + Ok(compiled_module) + }) + } + + #[inline] + pub fn check_read(&self, filename: &str) -> Result<(), ErrBox> { + self.permissions.check_read(filename) + } + + #[inline] + pub fn check_write(&self, filename: &str) -> Result<(), ErrBox> { + self.permissions.check_write(filename) + } + + #[inline] + pub fn check_env(&self) -> Result<(), ErrBox> { + self.permissions.check_env() + } + + #[inline] + pub fn check_net(&self, hostname: &str, port: u16) -> Result<(), ErrBox> { + self.permissions.check_net(hostname, port) + } + + #[inline] + pub fn check_net_url(&self, url: &url::Url) -> Result<(), ErrBox> { + self.permissions.check_net_url(url) + } + + #[inline] + pub fn check_run(&self) -> Result<(), ErrBox> { + self.permissions.check_run() + } + + pub fn check_dyn_import( + self: &Self, + module_specifier: &ModuleSpecifier, + ) -> Result<(), ErrBox> { + let u = module_specifier.as_url(); + match u.scheme() { + "http" | "https" => { + self.check_net_url(u)?; + Ok(()) + } + "file" => { + let filename = u + .to_file_path() + .unwrap() + .into_os_string() + .into_string() + .unwrap(); + self.check_read(&filename)?; + Ok(()) + } + _ => Err(permission_denied()), + } + } + + #[cfg(test)] + pub fn mock(argv: Vec) -> ThreadSafeGlobalState { + ThreadSafeGlobalState::new( + flags::DenoFlags::default(), + argv, + Progress::new(), + ) + .unwrap() + } +} + +#[test] +fn thread_safe() { + fn f(_: S) {} + f(ThreadSafeGlobalState::mock(vec![ + String::from("./deno"), + String::from("hello.js"), + ])); +} + +#[test] +fn import_map_given_for_repl() { + let _result = ThreadSafeGlobalState::new( + flags::DenoFlags { + import_map_path: Some("import_map.json".to_string()), + ..flags::DenoFlags::default() + }, + vec![String::from("./deno")], + Progress::new(), + ); +} diff --git a/cli/js/workers.ts b/cli/js/workers.ts index 281fe619fdc320eadc62b1edddb841719b760c6c..37061063f2b4858a735f1e51a8a49b3c35ae0f0a 100644 --- a/cli/js/workers.ts +++ b/cli/js/workers.ts @@ -65,7 +65,6 @@ export function postMessage(data: any): void { export async function getMessage(): Promise { log("getMessage"); const res = await sendAsync(dispatch.OP_WORKER_GET_MESSAGE); - if (res.data != null) { return decodeMessage(new Uint8Array(res.data)); } else { diff --git a/cli/lib.rs b/cli/lib.rs index 4f53195081afdb17299390f218967fa3a5936838..b6922c59166bedf001174ebcdbceac449bd8d328 100644 --- a/cli/lib.rs +++ b/cli/lib.rs @@ -15,6 +15,7 @@ extern crate nix; extern crate rand; extern crate serde; extern crate serde_derive; +extern crate tokio; extern crate url; mod checksum; @@ -28,12 +29,14 @@ mod file_fetcher; pub mod flags; pub mod fmt_errors; mod fs; +mod global_state; mod global_timer; mod http_body; mod http_util; mod import_map; mod js; mod lockfile; +mod metrics; pub mod msg; pub mod ops; pub mod permissions; @@ -55,6 +58,7 @@ pub mod worker; use crate::deno_error::js_check; use crate::deno_error::print_err_and_exit; +use crate::global_state::ThreadSafeGlobalState; use crate::progress::Progress; use crate::state::ThreadSafeState; use crate::worker::Worker; @@ -97,11 +101,13 @@ impl log::Log for Logger { fn create_worker_and_state( flags: DenoFlags, argv: Vec, -) -> (Worker, ThreadSafeState) { +) -> (Worker, ThreadSafeGlobalState) { use crate::shell::Shell; use std::sync::Arc; use std::sync::Mutex; + let shell = Arc::new(Mutex::new(Shell::new())); + let progress = Progress::new(); progress.set_callback(move |_done, _completed, _total, status, msg| { if !status.is_empty() { @@ -109,17 +115,23 @@ fn create_worker_and_state( s.status(status, msg).expect("shell problem"); } }); - // TODO(kevinkassimo): maybe make include_deno_namespace also configurable? - let state = ThreadSafeState::new(flags, argv, progress, true) + + let global_state = ThreadSafeGlobalState::new(flags, argv, progress) .map_err(deno_error::print_err_and_exit) .unwrap(); - let worker = Worker::new( - "main".to_string(), - startup_data::deno_isolate_init(), - state.clone(), - ); - (worker, state) + let state = ThreadSafeState::new( + global_state.clone(), + global_state.main_module.clone(), + true, + ) + .map_err(deno_error::print_err_and_exit) + .unwrap(); + + let worker = + Worker::new("main".to_string(), startup_data::deno_isolate_init(), state); + + (worker, global_state) } fn types_command() { @@ -128,7 +140,7 @@ fn types_command() { } fn print_cache_info(worker: Worker) { - let state = worker.state; + let state = &worker.state.global_state; println!( "{} {:?}", @@ -151,10 +163,11 @@ pub fn print_file_info( worker: Worker, module_specifier: &ModuleSpecifier, ) -> impl Future { + let global_state_ = worker.state.global_state.clone(); let state_ = worker.state.clone(); let module_specifier_ = module_specifier.clone(); - state_ + global_state_ .file_fetcher .fetch_source_file_async(&module_specifier) .map_err(|err| println!("{}", err)) @@ -171,7 +184,7 @@ pub fn print_file_info( msg::enum_name_media_type(out.media_type) ); - state_ + global_state_ .clone() .fetch_compiled_module(&module_specifier_) .map_err(|e| { @@ -182,9 +195,9 @@ pub fn print_file_info( .and_then(move |compiled| { if out.media_type == msg::MediaType::TypeScript || (out.media_type == msg::MediaType::JavaScript - && state_.ts_compiler.compile_js) + && global_state_.ts_compiler.compile_js) { - let compiled_source_file = state_ + let compiled_source_file = global_state_ .ts_compiler .get_compiled_source_file(&out.url) .unwrap(); @@ -196,7 +209,7 @@ pub fn print_file_info( ); } - if let Ok(source_map) = state_ + if let Ok(source_map) = global_state_ .clone() .ts_compiler .get_source_map_file(&module_specifier_) @@ -209,7 +222,7 @@ pub fn print_file_info( } if let Some(deps) = - worker.state.modules.lock().unwrap().deps(&compiled.name) + state_.modules.lock().unwrap().deps(&compiled.name) { println!("{}{}", colors::bold("deps:\n".to_string()), deps.name); if let Some(ref depsdeps) = deps.deps { @@ -236,7 +249,7 @@ fn info_command(flags: DenoFlags, argv: Vec) { return print_cache_info(worker); } - let main_module = state.main_module().unwrap(); + let main_module = state.main_module.as_ref().unwrap().clone(); let main_future = lazy(move || { // Setup runtime. js_check(worker.execute("denoMain()")); @@ -259,7 +272,7 @@ fn info_command(flags: DenoFlags, argv: Vec) { fn fetch_command(flags: DenoFlags, argv: Vec) { let (mut worker, state) = create_worker_and_state(flags, argv.clone()); - let main_module = state.main_module().unwrap(); + let main_module = state.main_module.as_ref().unwrap().clone(); let main_future = lazy(move || { // Setup runtime. js_check(worker.execute("denoMain()")); @@ -307,7 +320,7 @@ fn eval_command(flags: DenoFlags, argv: Vec) { fn bundle_command(flags: DenoFlags, argv: Vec) { let (worker, state) = create_worker_and_state(flags, argv); - let main_module = state.main_module().unwrap(); + let main_module = state.main_module.as_ref().unwrap().clone(); assert!(state.argv.len() >= 3); let out_file = state.argv[2].clone(); debug!(">>>>> bundle_async START"); @@ -353,7 +366,7 @@ fn run_script(flags: DenoFlags, argv: Vec) { let use_current_thread = flags.current_thread; let (mut worker, state) = create_worker_and_state(flags, argv); - let main_module = state.main_module().unwrap(); + let main_module = state.main_module.as_ref().unwrap().clone(); // Normal situation of executing a module. let main_future = lazy(move || { // Setup runtime. diff --git a/cli/metrics.rs b/cli/metrics.rs new file mode 100644 index 0000000000000000000000000000000000000000..d1e7567b96e69ab52a3b8e48b54e51279e392196 --- /dev/null +++ b/cli/metrics.rs @@ -0,0 +1,12 @@ +use std::sync::atomic::AtomicUsize; + +#[derive(Default)] +pub struct Metrics { + pub ops_dispatched: AtomicUsize, + pub ops_completed: AtomicUsize, + pub bytes_sent_control: AtomicUsize, + pub bytes_sent_data: AtomicUsize, + pub bytes_received: AtomicUsize, + pub resolve_count: AtomicUsize, + pub compiler_starts: AtomicUsize, +} diff --git a/cli/ops/compiler.rs b/cli/ops/compiler.rs index 720eb65fc26eee4b9b7100764f7ed6ebe5bb45be..e7d38f3643795bac11c94a06b2c896f852312c2c 100644 --- a/cli/ops/compiler.rs +++ b/cli/ops/compiler.rs @@ -4,6 +4,7 @@ use crate::futures::future::join_all; use crate::futures::Future; use crate::ops::json_op; use crate::state::ThreadSafeState; +use deno::Loader; use deno::*; pub fn init(i: &mut Isolate, s: &ThreadSafeState) { @@ -36,7 +37,7 @@ fn op_cache( let module_specifier = ModuleSpecifier::resolve_url(&args.module_id) .expect("Should be valid module specifier"); - state.ts_compiler.cache_compiler_output( + state.global_state.ts_compiler.cache_compiler_output( &module_specifier, &args.extension, &args.contents, @@ -67,6 +68,7 @@ fn op_fetch_source_files( let resolved_specifier = state.resolve(specifier, &args.referrer, false, is_dyn_import)?; let fut = state + .global_state .file_fetcher .fetch_source_file_async(&resolved_specifier); futures.push(fut); diff --git a/cli/ops/errors.rs b/cli/ops/errors.rs index 2d786b97de1a30b12885ac0688506aca27b00163..3d94c05c48a8fc109e71987da570d19e7bec1379 100644 --- a/cli/ops/errors.rs +++ b/cli/ops/errors.rs @@ -30,7 +30,7 @@ fn op_format_error( _zero_copy: Option, ) -> Result { let args: FormatErrorArgs = serde_json::from_value(args)?; - let error = JSError::from_json(&args.error, &state.ts_compiler); + let error = JSError::from_json(&args.error, &state.global_state.ts_compiler); Ok(JsonOp::Sync(json!({ "error": error.to_string(), @@ -57,7 +57,7 @@ fn op_apply_source_map( args.line.into(), args.column.into(), &mut mappings_map, - &state.ts_compiler, + &state.global_state.ts_compiler, ); Ok(JsonOp::Sync(json!({ diff --git a/cli/ops/os.rs b/cli/ops/os.rs index c50b8eedb69adf1bdeabdafef6101876baabf904..78fd940de2db40183c8e7c9a26e95b3f28d4b6ab 100644 --- a/cli/ops/os.rs +++ b/cli/ops/os.rs @@ -7,7 +7,6 @@ use crate::state::ThreadSafeState; use crate::version; use atty; use deno::*; -use log; use std::collections::HashMap; use std::env; use sys_info; @@ -40,16 +39,15 @@ fn op_start( _args: Value, _zero_copy: Option, ) -> Result { + let gs = &state.global_state; + Ok(JsonOp::Sync(json!({ "cwd": deno_fs::normalize_path(&env::current_dir().unwrap()), "pid": std::process::id(), - "argv": state.argv, - "mainModule": state.main_module().map(|x| x.as_str().to_string()), - "debugFlag": state - .flags - .log_level - .map_or(false, |l| l == log::Level::Debug), - "versionFlag": state.flags.version, + "argv": gs.argv, + "mainModule": gs.main_module.as_ref().map(|x| x.to_string()), + "debugFlag": gs.flags.log_level.map_or(false, |l| l == log::Level::Debug), + "versionFlag": gs.flags.version, "v8Version": version::v8(), "denoVersion": version::DENO, "tsVersion": version::TYPESCRIPT, diff --git a/cli/ops/repl.rs b/cli/ops/repl.rs index 6644ab15903488c02e066534c314b1d174c78982..4a3ba68d474758d239c135b7b242082d355b8d9f 100644 --- a/cli/ops/repl.rs +++ b/cli/ops/repl.rs @@ -44,7 +44,8 @@ fn op_repl_start( let args: ReplStartArgs = serde_json::from_value(args)?; debug!("op_repl_start {}", args.history_file); - let history_path = repl::history_path(&state.dir, &args.history_file); + let history_path = + repl::history_path(&state.global_state.dir, &args.history_file); let repl = repl::Repl::new(history_path); let resource = ReplResource(Arc::new(Mutex::new(repl))); let mut table = resources::lock_resource_table(); diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs index 809aa5d9bf07d8909992dae76c436e19df73f0c3..f6dcf8042b871862935262ed5d78482fb395a7c2 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/workers.rs @@ -4,7 +4,6 @@ use crate::deno_error::js_check; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; use crate::ops::json_op; -use crate::resources; use crate::startup_data; use crate::state::ThreadSafeState; use crate::worker::Worker; @@ -12,6 +11,7 @@ use deno::*; use futures; use futures::Async; use futures::Future; +use futures::IntoFuture; use futures::Sink; use futures::Stream; use std; @@ -48,18 +48,17 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { } struct GetMessageFuture { - pub state: ThreadSafeState, + state: ThreadSafeState, } impl Future for GetMessageFuture { type Item = Option; - type Error = (); + type Error = ErrBox; fn poll(&mut self) -> Result, Self::Error> { - let mut wc = self.state.worker_channels.lock().unwrap(); - wc.1 - .poll() - .map_err(|err| panic!("worker_channel recv err {:?}", err)) + let mut channels = self.state.worker_channels.lock().unwrap(); + let receiver = &mut channels.receiver; + receiver.poll().map_err(ErrBox::from) } } @@ -93,12 +92,10 @@ fn op_worker_post_message( data: Option, ) -> Result { let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); - - let tx = { - let wc = state.worker_channels.lock().unwrap(); - wc.0.clone() - }; - tx.send(d) + let mut channels = state.worker_channels.lock().unwrap(); + let sender = &mut channels.sender; + sender + .send(d) .wait() .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; @@ -132,28 +129,23 @@ fn op_create_worker( let parent_state = state.clone(); + // TODO(bartlomieju): Isn't this wrong? let mut module_specifier = ModuleSpecifier::resolve_url_or_path(specifier)?; - - let mut child_argv = parent_state.argv.clone(); - if !has_source_code { - if let Some(module) = state.main_module() { - module_specifier = - ModuleSpecifier::resolve_import(specifier, &module.to_string())?; - child_argv[1] = module_specifier.to_string(); + if let Some(referrer) = parent_state.main_module.as_ref() { + let referrer = referrer.clone().to_string(); + module_specifier = ModuleSpecifier::resolve_import(specifier, &referrer)?; } } let child_state = ThreadSafeState::new( - parent_state.flags.clone(), - child_argv, - parent_state.progress.clone(), + state.global_state.clone(), + Some(module_specifier.clone()), include_deno_namespace, )?; - let rid = child_state.resource.rid; + let rid = child_state.rid; let name = format!("USER-WORKER-{}", specifier); let deno_main_call = format!("denoMain({})", include_deno_namespace); - let mut worker = Worker::new(name, startup_data::deno_isolate_init(), child_state); js_check(worker.execute(&deno_main_call)); @@ -201,9 +193,8 @@ fn op_host_get_worker_closed( worker.clone() }; - let op = Box::new( - shared_worker_future.then(move |_result| futures::future::ok(json!({}))), - ); + let op = + shared_worker_future.then(move |_result| futures::future::ok(json!({}))); Ok(JsonOp::Async(Box::new(op))) } @@ -222,7 +213,7 @@ fn op_host_get_message( let args: HostGetMessageArgs = serde_json::from_value(args)?; let rid = args.rid as u32; - let op = resources::get_message_from_worker(rid) + let op = Worker::get_message_from_resource(rid) .map_err(move |_| -> ErrBox { unimplemented!() }) .and_then(move |maybe_buf| { futures::future::ok(json!({ @@ -250,7 +241,9 @@ fn op_host_post_message( let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); - resources::post_message_to_worker(rid, d)? + // TODO: rename to post_message_to_child(rid, d) + Worker::post_message_to_resource(rid, d) + .into_future() .wait() .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; diff --git a/cli/permissions.rs b/cli/permissions.rs index af4c960d7f78d691b27511f477ff2da3cf4d4bdc..1a470f551726329e8e989e3f8a0b1ae7504e6ee2 100644 --- a/cli/permissions.rs +++ b/cli/permissions.rs @@ -52,7 +52,7 @@ impl fmt::Display for PermissionAccessorState { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct PermissionAccessor { state: Arc, } @@ -110,7 +110,7 @@ impl Default for PermissionAccessor { } } -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct DenoPermissions { // Keep in sync with cli/js/permissions.ts pub allow_read: PermissionAccessor, diff --git a/cli/resources.rs b/cli/resources.rs index 4df4e1798f78b3c7a4e147644482731495723925..ba7795f5d823e694fcb7a84712948bc38c9594cc 100644 --- a/cli/resources.rs +++ b/cli/resources.rs @@ -11,9 +11,6 @@ use crate::deno_error; use crate::deno_error::bad_resource; use crate::http_body::HttpBody; -use crate::state::WorkerChannels; - -use deno::Buf; use deno::ErrBox; pub use deno::Resource as CoreResource; pub use deno::ResourceId; @@ -22,8 +19,6 @@ use deno::ResourceTable; use futures; use futures::Future; use futures::Poll; -use futures::Sink; -use futures::Stream; use reqwest::r#async::Decoder as ReqwestDecoder; use std; use std::io::{Error, Read, Seek, SeekFrom, Write}; @@ -34,7 +29,6 @@ use std::sync::MutexGuard; use tokio; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; -use tokio::sync::mpsc; use tokio_process; use tokio_rustls::client::TlsStream as ClientTlsStream; use tokio_rustls::server::TlsStream as ServerTlsStream; @@ -101,7 +95,6 @@ enum CliResource { ChildStdin(tokio_process::ChildStdin), ChildStdout(tokio_process::ChildStdout), ChildStderr(tokio_process::ChildStderr), - Worker(WorkerChannels), } impl CoreResource for CliResource { @@ -133,7 +126,6 @@ impl CoreResource for CliResource { CliResource::ChildStdin(_) => "childStdin", CliResource::ChildStdout(_) => "childStdout", CliResource::ChildStderr(_) => "childStderr", - CliResource::Worker(_) => "worker", } } } @@ -372,78 +364,6 @@ pub fn add_reqwest_body(body: ReqwestDecoder) -> Resource { Resource { rid } } -pub fn add_worker(wc: WorkerChannels) -> Resource { - let mut table = lock_resource_table(); - let rid = table.add(Box::new(CliResource::Worker(wc))); - Resource { rid } -} - -/// Post message to worker as a host or privilged overlord -pub fn post_message_to_worker( - rid: ResourceId, - buf: Buf, -) -> Result>, ErrBox> { - let mut table = lock_resource_table(); - let repr = table.get_mut::(rid).ok_or_else(bad_resource)?; - match repr { - CliResource::Worker(ref mut wc) => { - let sender = wc.0.clone(); - Ok(sender.send(buf)) - } - _ => Err(bad_resource()), - } -} - -pub struct WorkerReceiver { - rid: ResourceId, -} - -// Invert the dumbness that tokio_process causes by making Child itself a future. -impl Future for WorkerReceiver { - type Item = Option; - type Error = ErrBox; - - fn poll(&mut self) -> Poll, ErrBox> { - let mut table = lock_resource_table(); - let repr = table - .get_mut::(self.rid) - .ok_or_else(bad_resource)?; - match repr { - CliResource::Worker(ref mut wc) => wc.1.poll().map_err(ErrBox::from), - _ => Err(bad_resource()), - } - } -} - -pub fn get_message_from_worker(rid: ResourceId) -> WorkerReceiver { - WorkerReceiver { rid } -} - -pub struct WorkerReceiverStream { - rid: ResourceId, -} - -// Invert the dumbness that tokio_process causes by making Child itself a future. -impl Stream for WorkerReceiverStream { - type Item = Buf; - type Error = ErrBox; - - fn poll(&mut self) -> Poll, ErrBox> { - let mut table = lock_resource_table(); - let repr = table - .get_mut::(self.rid) - .ok_or_else(bad_resource)?; - match repr { - CliResource::Worker(ref mut wc) => wc.1.poll().map_err(ErrBox::from), - _ => Err(bad_resource()), - } - } -} - -pub fn get_message_stream_from_worker(rid: ResourceId) -> WorkerReceiverStream { - WorkerReceiverStream { rid } -} - pub struct ChildResources { pub child_rid: Option, pub stdin_rid: Option, diff --git a/cli/state.rs b/cli/state.rs index 1f44f254f59d73a0188bfeb81773080c9f075e96..544c199b8ead9a77922c2539ac7daa6bf34c6829 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -1,22 +1,15 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use crate::compilers::CompiledModule; -use crate::compilers::JsCompiler; -use crate::compilers::JsonCompiler; -use crate::compilers::TsCompiler; -use crate::deno_dir; use crate::deno_error::permission_denied; -use crate::file_fetcher::SourceFileFetcher; -use crate::flags; +use crate::global_state::ThreadSafeGlobalState; use crate::global_timer::GlobalTimer; use crate::import_map::ImportMap; -use crate::lockfile::Lockfile; -use crate::msg; +use crate::metrics::Metrics; use crate::ops::JsonOp; use crate::permissions::DenoPermissions; -use crate::progress::Progress; use crate::resources; use crate::resources::ResourceId; use crate::worker::Worker; +use crate::worker::WorkerChannels; use deno::Buf; use deno::CoreOp; use deno::ErrBox; @@ -31,31 +24,18 @@ use rand::SeedableRng; use serde_json::Value; use std; use std::collections::HashMap; -use std::env; use std::ops::Deref; use std::str; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Mutex; use std::time::Instant; use tokio::sync::mpsc as async_mpsc; -pub type WorkerSender = async_mpsc::Sender; -pub type WorkerReceiver = async_mpsc::Receiver; -pub type WorkerChannels = (WorkerSender, WorkerReceiver); +// TODO: hold references to concrete Workers instead of shared futures of +// those workers? pub type UserWorkerTable = HashMap>; -#[derive(Default)] -pub struct Metrics { - pub ops_dispatched: AtomicUsize, - pub ops_completed: AtomicUsize, - pub bytes_sent_control: AtomicUsize, - pub bytes_sent_data: AtomicUsize, - pub bytes_received: AtomicUsize, - pub resolve_count: AtomicUsize, - pub compiler_starts: AtomicUsize, -} - /// Isolate cannot be passed between threads but ThreadSafeState can. /// ThreadSafeState satisfies Send and Sync. So any state that needs to be /// accessed outside the main V8 thread should be inside ThreadSafeState. @@ -63,34 +43,22 @@ pub struct ThreadSafeState(Arc); #[cfg_attr(feature = "cargo-clippy", allow(stutter))] pub struct State { + pub global_state: ThreadSafeGlobalState, pub modules: Arc>, - pub main_module: Option, - pub dir: deno_dir::DenoDir, - pub argv: Vec, pub permissions: DenoPermissions, - pub flags: flags::DenoFlags, + pub main_module: Option, + pub worker_channels: Mutex, /// When flags contains a `.import_map_path` option, the content of the /// import map file will be resolved and set. pub import_map: Option, pub metrics: Metrics, - pub worker_channels: Mutex, pub global_timer: Mutex, pub workers: Mutex, pub start_time: Instant, /// A reference to this worker's resource. - pub resource: resources::Resource, - /// Reference to global progress bar. - pub progress: Progress, + pub rid: ResourceId, pub seeded_rng: Option>, - - pub file_fetcher: SourceFileFetcher, - pub js_compiler: JsCompiler, - pub json_compiler: JsonCompiler, - pub ts_compiler: TsCompiler, - pub include_deno_namespace: bool, - - pub lockfile: Option>, } impl Clone for ThreadSafeState { @@ -195,158 +163,74 @@ impl Loader for ThreadSafeState { ) -> Box { self.metrics.resolve_count.fetch_add(1, Ordering::SeqCst); let module_url_specified = module_specifier.to_string(); - Box::new(self.fetch_compiled_module(module_specifier).map( - |compiled_module| deno::SourceCodeInfo { + let fut = self + .global_state + .fetch_compiled_module(module_specifier) + .map(|compiled_module| deno::SourceCodeInfo { // Real module name, might be different from initial specifier // due to redirections. code: compiled_module.code, module_url_specified, module_url_found: compiled_module.name, - }, - )) + }); + + Box::new(fut) } } impl ThreadSafeState { pub fn new( - flags: flags::DenoFlags, - argv_rest: Vec, - progress: Progress, + global_state: ThreadSafeGlobalState, + main_module: Option, include_deno_namespace: bool, ) -> Result { - let custom_root = env::var("DENO_DIR").map(String::into).ok(); - let (worker_in_tx, worker_in_rx) = async_mpsc::channel::(1); let (worker_out_tx, worker_out_rx) = async_mpsc::channel::(1); - let internal_channels = (worker_out_tx, worker_in_rx); - let external_channels = (worker_in_tx, worker_out_rx); - let resource = resources::add_worker(external_channels); - - let dir = deno_dir::DenoDir::new(custom_root)?; - - let file_fetcher = SourceFileFetcher::new( - dir.deps_cache.clone(), - progress.clone(), - !flags.reload, - flags.cache_blacklist.clone(), - flags.no_fetch, - )?; - - let ts_compiler = TsCompiler::new( - file_fetcher.clone(), - dir.gen_cache.clone(), - !flags.reload, - flags.config_path.clone(), - )?; - - let main_module: Option = if argv_rest.len() <= 1 { - None - } else { - let root_specifier = argv_rest[1].clone(); - Some(ModuleSpecifier::resolve_url_or_path(&root_specifier)?) + let internal_channels = WorkerChannels { + sender: worker_out_tx, + receiver: worker_in_rx, }; - - let import_map: Option = match &flags.import_map_path { - None => None, - Some(file_path) => Some(ImportMap::load(file_path)?), + let external_channels = WorkerChannels { + sender: worker_in_tx, + receiver: worker_out_rx, }; - let mut seeded_rng = None; - if let Some(seed) = flags.seed { - seeded_rng = Some(Mutex::new(StdRng::seed_from_u64(seed))); - }; + let mut table = resources::lock_resource_table(); + let rid = table.add(Box::new(external_channels)); - let modules = Arc::new(Mutex::new(deno::Modules::new())); + let import_map: Option = + match global_state.flags.import_map_path.as_ref() { + None => None, + Some(file_path) => Some(ImportMap::load(file_path)?), + }; - // Note: reads lazily from disk on first call to lockfile.check() - let lockfile = if let Some(filename) = &flags.lock { - Some(Mutex::new(Lockfile::new(filename.to_string()))) - } else { - None + let seeded_rng = match global_state.flags.seed { + Some(seed) => Some(Mutex::new(StdRng::seed_from_u64(seed))), + None => None, }; + let modules = Arc::new(Mutex::new(deno::Modules::new())); + let permissions = global_state.permissions.clone(); + let state = State { - main_module, + global_state, modules, - dir, - argv: argv_rest, - permissions: DenoPermissions::from_flags(&flags), - flags, + main_module, + permissions, import_map, - metrics: Metrics::default(), worker_channels: Mutex::new(internal_channels), + metrics: Metrics::default(), global_timer: Mutex::new(GlobalTimer::new()), workers: Mutex::new(UserWorkerTable::new()), start_time: Instant::now(), - resource, - progress, + rid, seeded_rng, - file_fetcher, - ts_compiler, - js_compiler: JsCompiler {}, - json_compiler: JsonCompiler {}, include_deno_namespace, - lockfile, }; Ok(ThreadSafeState(Arc::new(state))) } - pub fn fetch_compiled_module( - self: &Self, - module_specifier: &ModuleSpecifier, - ) -> impl Future { - let state1 = self.clone(); - let state2 = self.clone(); - - self - .file_fetcher - .fetch_source_file_async(&module_specifier) - .and_then(move |out| match out.media_type { - msg::MediaType::Unknown => { - state1.js_compiler.compile_async(state1.clone(), &out) - } - msg::MediaType::Json => { - state1.json_compiler.compile_async(state1.clone(), &out) - } - msg::MediaType::TypeScript - | msg::MediaType::TSX - | msg::MediaType::JSX => { - state1.ts_compiler.compile_async(state1.clone(), &out) - } - msg::MediaType::JavaScript => { - if state1.ts_compiler.compile_js { - state1.ts_compiler.compile_async(state1.clone(), &out) - } else { - state1.js_compiler.compile_async(state1.clone(), &out) - } - } - }) - .and_then(move |compiled_module| { - if let Some(ref lockfile) = state2.lockfile { - let mut g = lockfile.lock().unwrap(); - if state2.flags.lock_write { - g.insert(&compiled_module); - } else if !g.check(&compiled_module)? { - eprintln!( - "Subresource integrety check failed --lock={}\n{}", - g.filename, compiled_module.name - ); - std::process::exit(10); - } - } - Ok(compiled_module) - }) - } - - /// Read main module from argv - pub fn main_module(&self) -> Option { - match &self.main_module { - Some(module_specifier) => Some(module_specifier.clone()), - None => None, - } - } - #[inline] pub fn check_read(&self, filename: &str) -> Result<(), ErrBox> { self.permissions.check_read(filename) @@ -403,10 +287,17 @@ impl ThreadSafeState { #[cfg(test)] pub fn mock(argv: Vec) -> ThreadSafeState { + let module_specifier = if argv.is_empty() { + None + } else { + let module_specifier = ModuleSpecifier::resolve_url_or_path(&argv[0]) + .expect("Invalid entry module"); + Some(module_specifier) + }; + ThreadSafeState::new( - flags::DenoFlags::default(), - argv, - Progress::new(), + ThreadSafeGlobalState::mock(argv), + module_specifier, true, ) .unwrap() @@ -445,16 +336,3 @@ fn thread_safe() { String::from("hello.js"), ])); } - -#[test] -fn import_map_given_for_repl() { - let _result = ThreadSafeState::new( - flags::DenoFlags { - import_map_path: Some("import_map.json".to_string()), - ..flags::DenoFlags::default() - }, - vec![String::from("./deno")], - Progress::new(), - true, - ); -} diff --git a/cli/worker.rs b/cli/worker.rs index 4bb68fb95ef9843574911f7ef2c42f7cb5954d94..90fb95af7c1b59e0ecfa9bd3a31cc54f4ba9ffd7 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -1,33 +1,57 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use crate::deno_error::bad_resource; use crate::fmt_errors::JSError; use crate::ops; +use crate::resources; +use crate::resources::CoreResource; +use crate::resources::ResourceId; use crate::state::ThreadSafeState; use deno; +use deno::Buf; use deno::ErrBox; use deno::ModuleSpecifier; use deno::RecursiveLoad; use deno::StartupData; use futures::Async; use futures::Future; +use futures::Poll; +use futures::Sink; +use futures::Stream; use std::env; use std::sync::Arc; use std::sync::Mutex; +use tokio::sync::mpsc; use url::Url; +/// Wraps mpsc channels into a generic resource so they can be referenced +/// from ops and used to facilitate parent-child communication +/// for workers. +pub struct WorkerChannels { + pub sender: mpsc::Sender, + pub receiver: mpsc::Receiver, +} + +impl CoreResource for WorkerChannels { + fn inspect_repr(&self) -> &str { + "worker" + } +} + /// Wraps deno::Isolate to provide source maps, ops for the CLI, and -/// high-level module loading +/// high-level module loading. #[derive(Clone)] pub struct Worker { + pub name: String, isolate: Arc>, pub state: ThreadSafeState, } impl Worker { pub fn new( - _name: String, + name: String, startup_data: StartupData, state: ThreadSafeState, - ) -> Worker { + ) -> Self { let isolate = Arc::new(Mutex::new(deno::Isolate::new(startup_data, false))); { let mut i = isolate.lock().unwrap(); @@ -61,12 +85,16 @@ impl Worker { Box::new(load_stream) }); - let state_ = state.clone(); + let global_state_ = state.global_state.clone(); i.set_js_error_create(move |v8_exception| { - JSError::from_v8_exception(v8_exception, &state_.ts_compiler) + JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler) }) } - Self { isolate, state } + Self { + name, + isolate, + state, + } } /// Same as execute2() but the filename defaults to "$CWD/__anonymous__". @@ -106,7 +134,7 @@ impl Worker { ) .get_future(isolate); recursive_load.and_then(move |id| -> Result<(), ErrBox> { - worker.state.progress.done(); + worker.state.global_state.progress.done(); if is_prefetch { Ok(()) } else { @@ -115,6 +143,37 @@ impl Worker { } }) } + + /// Post message to worker as a host or privileged overlord + pub fn post_message(self: &Self, buf: Buf) -> Result, ErrBox> { + Worker::post_message_to_resource(self.state.rid, buf) + } + + pub fn post_message_to_resource( + rid: resources::ResourceId, + buf: Buf, + ) -> Result, ErrBox> { + debug!("post message to resource {}", rid); + let mut table = resources::lock_resource_table(); + let worker = table + .get_mut::(rid) + .ok_or_else(bad_resource)?; + let sender = &mut worker.sender; + sender + .send(buf) + .poll() + .map(|_| Async::Ready(())) + .map_err(ErrBox::from) + } + + pub fn get_message(self: &Self) -> WorkerReceiver { + Worker::get_message_from_resource(self.state.rid) + } + + pub fn get_message_from_resource(rid: ResourceId) -> WorkerReceiver { + debug!("get message from resource {}", rid); + WorkerReceiver { rid } + } } impl Future for Worker { @@ -127,16 +186,39 @@ impl Future for Worker { } } +/// This structure wraps worker's resource id to implement future +/// that will return message received from worker or None +/// if worker's channel has been closed. +pub struct WorkerReceiver { + rid: ResourceId, +} + +impl Future for WorkerReceiver { + type Item = Option; + type Error = ErrBox; + + fn poll(&mut self) -> Poll, ErrBox> { + let mut table = resources::lock_resource_table(); + let worker = table + .get_mut::(self.rid) + .ok_or_else(bad_resource)?; + let receiver = &mut worker.receiver; + receiver.poll().map_err(ErrBox::from) + } +} + #[cfg(test)] mod tests { use super::*; use crate::flags; + use crate::flags::DenoFlags; + use crate::global_state::ThreadSafeGlobalState; use crate::progress::Progress; - use crate::resources; use crate::startup_data; use crate::state::ThreadSafeState; use crate::tokio_util; use futures::future::lazy; + use futures::IntoFuture; use std::sync::atomic::Ordering; #[test] @@ -149,13 +231,15 @@ mod tests { let module_specifier = ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap(); let argv = vec![String::from("./deno"), module_specifier.to_string()]; - let state = ThreadSafeState::new( + let global_state = ThreadSafeGlobalState::new( flags::DenoFlags::default(), argv, Progress::new(), - true, ) .unwrap(); + let state = + ThreadSafeState::new(global_state, Some(module_specifier.clone()), true) + .unwrap(); let state_ = state.clone(); tokio_util::run(lazy(move || { let mut worker = @@ -186,13 +270,12 @@ mod tests { let module_specifier = ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap(); let argv = vec![String::from("deno"), module_specifier.to_string()]; - let state = ThreadSafeState::new( - flags::DenoFlags::default(), - argv, - Progress::new(), - true, - ) - .unwrap(); + let global_state = + ThreadSafeGlobalState::new(DenoFlags::default(), argv, Progress::new()) + .unwrap(); + let state = + ThreadSafeState::new(global_state, Some(module_specifier.clone()), true) + .unwrap(); let state_ = state.clone(); tokio_util::run(lazy(move || { let mut worker = @@ -227,8 +310,15 @@ mod tests { let argv = vec![String::from("deno"), module_specifier.to_string()]; let mut flags = flags::DenoFlags::default(); flags.reload = true; - let state = - ThreadSafeState::new(flags, argv, Progress::new(), true).unwrap(); + let global_state = + ThreadSafeGlobalState::new(flags, argv, Progress::new()).unwrap(); + let state = ThreadSafeState::new( + global_state.clone(), + Some(module_specifier.clone()), + true, + ) + .unwrap(); + let global_state_ = global_state.clone(); let state_ = state.clone(); tokio_util::run(lazy(move || { let mut worker = Worker::new( @@ -247,10 +337,12 @@ mod tests { }) })); - let metrics = &state_.metrics; - assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 3); + assert_eq!(state_.metrics.resolve_count.load(Ordering::SeqCst), 3); // Check that we've only invoked the compiler once. - assert_eq!(metrics.compiler_starts.load(Ordering::SeqCst), 1); + assert_eq!( + global_state_.metrics.compiler_starts.load(Ordering::SeqCst), + 1 + ); drop(http_server_guard); } @@ -285,8 +377,9 @@ mod tests { "#; worker.execute(source).unwrap(); - let resource = worker.state.resource.clone(); - let resource_ = resource.clone(); + let worker_ = worker.clone(); + let rid = worker.state.rid; + let resource_ = resources::Resource { rid }; tokio::spawn(lazy(move || { worker.then(move |r| -> Result<(), ()> { @@ -298,14 +391,10 @@ mod tests { let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = resources::post_message_to_worker(resource.rid, msg) - .expect("Bad resource") - .wait(); + let r = worker_.post_message(msg).into_future().wait(); assert!(r.is_ok()); - let maybe_msg = resources::get_message_from_worker(resource.rid) - .wait() - .unwrap(); + let maybe_msg = worker_.get_message().wait().unwrap(); assert!(maybe_msg.is_some()); // Check if message received is [1, 2, 3] in json assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]"); @@ -314,9 +403,7 @@ mod tests { .to_string() .into_boxed_str() .into_boxed_bytes(); - let r = resources::post_message_to_worker(resource.rid, msg) - .expect("Bad resource") - .wait(); + let r = worker_.post_message(msg).into_future().wait(); assert!(r.is_ok()); }) } @@ -329,8 +416,9 @@ mod tests { .execute("onmessage = () => { delete window.onmessage; }") .unwrap(); - let resource = worker.state.resource.clone(); - let rid = resource.rid; + let rid = worker.state.rid; + let resource = resources::Resource { rid }; + let worker_ = worker.clone(); let worker_future = worker .then(move |r| -> Result<(), ()> { @@ -345,9 +433,7 @@ mod tests { tokio::spawn(lazy(move || worker_future_.then(|_| Ok(())))); let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = resources::post_message_to_worker(rid, msg) - .expect("Bad resource") - .wait(); + let r = worker_.post_message(msg).into_future().wait(); assert!(r.is_ok()); debug!("rid {:?}", rid);