diff --git a/BUILD.gn b/BUILD.gn index 4a7293262d802cf29c81c1b60ed97d0cca4ffda6..f92112e19a9f940b6581c1daf51d8fa741b7cd58 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -54,6 +54,7 @@ main_extern = [ "$rust_build:tempfile", "$rust_build:rand", "$rust_build:tokio", + "$rust_build:tokio_executor", "$rust_build:url", "$rust_build:remove_dir_all", "$rust_build:dirs", diff --git a/js/globals.ts b/js/globals.ts index 3e40db06cdc3de8b81598f72cf7e833dec9ffa58..d88e1b28de8e267c36e364edb4574c183565b5b9 100644 --- a/js/globals.ts +++ b/js/globals.ts @@ -56,8 +56,6 @@ declare global { export const window = globalEval("this"); window.window = window; -window.libdeno = null; - window.setTimeout = timers.setTimeout; window.setInterval = timers.setInterval; window.clearTimeout = timers.clearTimer; diff --git a/src/handlers.rs b/src/handlers.rs index 623c64110b28504e6e5d4922bc65fa8ba1e6f7f3..c914df36d0f9825b4dfedec1db08a4a10844ea0e 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -1,46 +1,44 @@ // Copyright 2018 the Deno authors. All rights reserved. MIT license. + use errors::DenoError; use errors::DenoResult; -use flatbuffers::FlatBufferBuilder; use fs as deno_fs; +use isolate::Buf; +use isolate::IsolateState; +use isolate::Op; +use msg; + +use flatbuffers::FlatBufferBuilder; use futures; use futures::sync::oneshot; use hyper; use hyper::rt::{Future, Stream}; use hyper::Client; -use isolate::from_c; -use libdeno; -use libdeno::{deno_buf, isolate}; -use msg; use remove_dir_all::remove_dir_all; use std; use std::fs; #[cfg(any(unix))] use std::os::unix::fs::PermissionsExt; use std::path::Path; +use std::sync::Arc; use std::time::UNIX_EPOCH; use std::time::{Duration, Instant}; use tokio::timer::Delay; -// Buf represents a byte array returned from a "Op". -// The message might be empty (which will be translated into a null object on -// the javascript side) or it is a heap allocated opaque sequence of bytes. -// Usually a flatbuffer message. -type Buf = Option>; - -// JS promises in Deno map onto a specific Future -// which yields either a DenoError or a byte array. -type Op = Future; - type OpResult = DenoResult; // TODO Ideally we wouldn't have to box the Op being returned. // The box is just to make it easier to get a prototype refactor working. -type Handler = fn(i: *const isolate, base: &msg::Base) -> Box; +type Handler = fn(state: Arc, base: &msg::Base) -> Box; -pub extern "C" fn msg_from_js(i: *const isolate, buf: deno_buf) { - let bytes = unsafe { std::slice::from_raw_parts(buf.data_ptr, buf.data_len) }; +// Hopefully Rust optimizes this away. +fn empty_buf() -> Buf { + Box::new([]) +} + +pub fn msg_from_js(state: Arc, bytes: &[u8]) -> (bool, Box) { let base = msg::get_root_as_base(bytes); + let is_sync = base.sync(); let msg_type = base.msg_type(); let cmd_id = base.cmd_id(); let handler: Handler = match msg_type { @@ -68,73 +66,51 @@ pub extern "C" fn msg_from_js(i: *const isolate, buf: deno_buf) { )), }; - let future = handler(i, &base); - let future = future.or_else(move |err| { - // No matter whether we got an Err or Ok, we want a serialized message to - // send back. So transform the DenoError into a deno_buf. - let builder = &mut FlatBufferBuilder::new(); - let errmsg_offset = builder.create_string(&format!("{}", err)); - Ok(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - error: Some(errmsg_offset), - error_kind: err.kind(), - ..Default::default() - }, - )) - }); - - let isolate = from_c(i); - if base.sync() { - // Execute future synchronously. - // println!("sync handler {}", msg::enum_name_any(msg_type)); - let maybe_box_u8 = future.wait().unwrap(); - match maybe_box_u8 { - None => {} - Some(box_u8) => { - let buf = deno_buf_from(box_u8); - // Set the synchronous response, the value returned from isolate.send(). - unsafe { libdeno::deno_set_response(i, buf) } - } - } - } else { - // Execute future asynchornously. - let future = future.and_then(move |maybe_box_u8| { - let buf = match maybe_box_u8 { - Some(box_u8) => deno_buf_from(box_u8), - None => { - // async RPCs that return None still need to - // send a message back to signal completion. - let builder = &mut FlatBufferBuilder::new(); - deno_buf_from( - serialize_response( - cmd_id, - builder, - msg::BaseArgs { - ..Default::default() - }, - ).unwrap(), - ) - } + let op: Box = handler(state.clone(), &base); + let boxed_op = Box::new( + op.or_else(move |err: DenoError| -> DenoResult { + debug!("op err {}", err); + // No matter whether we got an Err or Ok, we want a serialized message to + // send back. So transform the DenoError into a deno_buf. + let builder = &mut FlatBufferBuilder::new(); + let errmsg_offset = builder.create_string(&format!("{}", err)); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + error: Some(errmsg_offset), + error_kind: err.kind(), + ..Default::default() + }, + )) + }).and_then(move |buf: Buf| -> DenoResult { + // Handle empty responses. For sync responses we just want + // to send null. For async we want to send a small message + // with the cmd_id. + let buf = if is_sync || buf.len() > 0 { + buf + } else { + // async RPCs that return empty still need to + // send a message back to signal completion. + let builder = &mut FlatBufferBuilder::new(); + serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + ) }; - // TODO(ry) make this thread safe. - unsafe { libdeno::deno_send(i, buf) }; - Ok(()) - }); - isolate.rt.spawn(future); - } -} + Ok(buf) + }), + ); -fn deno_buf_from(x: Box<[u8]>) -> deno_buf { - let len = x.len(); - let ptr = Box::into_raw(x); - deno_buf { - alloc_ptr: 0 as *mut u8, - alloc_len: 0, - data_ptr: ptr as *mut u8, - data_len: len, - } + debug!( + "msg_from_js {} sync {}", + msg::enum_name_any(msg_type), + base.sync() + ); + return (base.sync(), boxed_op); } fn permission_denied() -> DenoError { @@ -151,16 +127,15 @@ fn not_implemented() -> DenoError { )) } -fn handle_exit(_i: *const isolate, base: &msg::Base) -> Box { +fn handle_exit(_config: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_exit().unwrap(); std::process::exit(msg.code()) } -fn handle_start(i: *const isolate, base: &msg::Base) -> Box { - let isolate = from_c(i); +fn handle_start(state: Arc, base: &msg::Base) -> Box { let mut builder = FlatBufferBuilder::new(); - let argv = isolate.argv.iter().map(|s| s.as_str()).collect::>(); + let argv = state.argv.iter().map(|s| s.as_str()).collect::>(); let argv_off = builder.create_vector_of_strings(argv.as_slice()); let cwd_path = std::env::current_dir().unwrap(); @@ -172,8 +147,8 @@ fn handle_start(i: *const isolate, base: &msg::Base) -> Box { &msg::StartResArgs { cwd: Some(cwd_off), argv: Some(argv_off), - debug_flag: isolate.flags.log_debug, - recompile_flag: isolate.flags.recompile, + debug_flag: state.flags.log_debug, + recompile_flag: state.flags.recompile, ..Default::default() }, ); @@ -200,7 +175,7 @@ fn serialize_response( let data = builder.finished_data(); // println!("serialize_response {:x?}", data); let vec = data.to_vec(); - Some(vec.into_boxed_slice()) + vec.into_boxed_slice() } fn ok_future(buf: Buf) -> Box { @@ -213,22 +188,17 @@ fn odd_future(err: DenoError) -> Box { } // https://github.com/denoland/isolate/blob/golang/os.go#L100-L154 -fn handle_code_fetch(i: *const isolate, base: &msg::Base) -> Box { +fn handle_code_fetch(state: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_code_fetch().unwrap(); let cmd_id = base.cmd_id(); let module_specifier = msg.module_specifier().unwrap(); let containing_file = msg.containing_file().unwrap(); - let isolate = from_c(i); - assert_eq!( - isolate.dir.root.join("gen"), - isolate.dir.gen, - "Sanity check" - ); + assert_eq!(state.dir.root.join("gen"), state.dir.gen, "Sanity check"); Box::new(futures::future::result(|| -> OpResult { let builder = &mut FlatBufferBuilder::new(); - let out = isolate.dir.code_fetch(module_specifier, containing_file)?; + let out = state.dir.code_fetch(module_specifier, containing_file)?; let mut msg_args = msg::CodeFetchResArgs { module_name: Some(builder.create_string(&out.module_name)), filename: Some(builder.create_string(&out.filename)), @@ -255,36 +225,34 @@ fn handle_code_fetch(i: *const isolate, base: &msg::Base) -> Box { } // https://github.com/denoland/isolate/blob/golang/os.go#L156-L169 -fn handle_code_cache(i: *const isolate, base: &msg::Base) -> Box { +fn handle_code_cache(state: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_code_cache().unwrap(); let filename = msg.filename().unwrap(); let source_code = msg.source_code().unwrap(); let output_code = msg.output_code().unwrap(); Box::new(futures::future::result(|| -> OpResult { - let isolate = from_c(i); - isolate.dir.code_cache(filename, source_code, output_code)?; - Ok(None) + state.dir.code_cache(filename, source_code, output_code)?; + Ok(empty_buf()) }())) } -fn handle_set_env(i: *const isolate, base: &msg::Base) -> Box { +fn handle_set_env(state: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_set_env().unwrap(); let key = msg.key().unwrap(); let value = msg.value().unwrap(); - let isolate = from_c(i); - if !isolate.flags.allow_env { + if !state.flags.allow_env { return odd_future(permission_denied()); } std::env::set_var(key, value); - ok_future(None) + ok_future(empty_buf()) } -fn handle_env(i: *const isolate, base: &msg::Base) -> Box { - let isolate = from_c(i); +fn handle_env(state: Arc, base: &msg::Base) -> Box { let cmd_id = base.cmd_id(); - if !isolate.flags.allow_env { + + if !state.flags.allow_env { return odd_future(permission_denied()); } @@ -322,22 +290,23 @@ fn handle_env(i: *const isolate, base: &msg::Base) -> Box { )) } -fn handle_fetch_req(i: *const isolate, base: &msg::Base) -> Box { +fn handle_fetch_req(state: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_fetch_req().unwrap(); let cmd_id = base.cmd_id(); let id = msg.id(); let url = msg.url().unwrap(); - let isolate = from_c(i); - if !isolate.flags.allow_net { + if !state.flags.allow_net { return odd_future(permission_denied()); } let url = url.parse::().unwrap(); let client = Client::new(); + debug!("Before fetch {}", url); let future = client.get(url).and_then(move |res| { let status = res.status().as_u16() as i32; + debug!("fetch {}", status); let headers = { let map = res.headers(); @@ -361,6 +330,7 @@ fn handle_fetch_req(i: *const isolate, base: &msg::Base) -> Box { let future = future.map_err(|err| -> DenoError { err.into() }).and_then( move |(status, body, headers)| { + debug!("fetch body "); let builder = &mut FlatBufferBuilder::new(); // Send the first message without a body. This is just to indicate // what status code. @@ -422,7 +392,7 @@ where (delay_task, cancel_tx) } -fn handle_make_temp_dir(i: *const isolate, base: &msg::Base) -> Box { +fn handle_make_temp_dir(state: Arc, base: &msg::Base) -> Box { let base = Box::new(*base); let msg = base.msg_as_make_temp_dir().unwrap(); let cmd_id = base.cmd_id(); @@ -430,8 +400,7 @@ fn handle_make_temp_dir(i: *const isolate, base: &msg::Base) -> Box { let prefix = msg.prefix(); let suffix = msg.suffix(); - let isolate = from_c(i); - if !isolate.flags.allow_write { + if !state.flags.allow_write { return odd_future(permission_denied()); } // TODO Use blocking() here. @@ -461,28 +430,28 @@ fn handle_make_temp_dir(i: *const isolate, base: &msg::Base) -> Box { }())) } -fn handle_mkdir(i: *const isolate, base: &msg::Base) -> Box { +fn handle_mkdir(state: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_mkdir().unwrap(); let mode = msg.mode(); let path = msg.path().unwrap(); - let isolate = from_c(i); - if !isolate.flags.allow_write { + + if !state.flags.allow_write { return odd_future(permission_denied()); } // TODO Use tokio_threadpool. Box::new(futures::future::result(|| -> OpResult { debug!("handle_mkdir {}", path); deno_fs::mkdir(Path::new(path), mode)?; - Ok(None) + Ok(empty_buf()) }())) } -fn handle_remove(i: *const isolate, base: &msg::Base) -> Box { +fn handle_remove(state: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_remove().unwrap(); let path = msg.path().unwrap(); let recursive = msg.recursive(); - let isolate = from_c(i); - if !isolate.flags.allow_write { + + if !state.flags.allow_write { return odd_future(permission_denied()); } // TODO Use tokio_threadpool. @@ -499,12 +468,12 @@ fn handle_remove(i: *const isolate, base: &msg::Base) -> Box { fs::remove_dir(&path_)?; } } - Ok(None) + Ok(empty_buf()) }())) } // Prototype https://github.com/denoland/isolate/blob/golang/os.go#L171-L184 -fn handle_read_file(_i: *const isolate, base: &msg::Base) -> Box { +fn handle_read_file(_config: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_read_file().unwrap(); let cmd_id = base.cmd_id(); let filename = String::from(msg.filename().unwrap()); @@ -554,7 +523,7 @@ fn get_mode(_perm: fs::Permissions) -> u32 { 0 } -fn handle_stat(_i: *const isolate, base: &msg::Base) -> Box { +fn handle_stat(_config: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_stat().unwrap(); let cmd_id = base.cmd_id(); let filename = String::from(msg.filename().unwrap()); @@ -597,48 +566,49 @@ fn handle_stat(_i: *const isolate, base: &msg::Base) -> Box { }())) } -fn handle_write_file(i: *const isolate, base: &msg::Base) -> Box { +fn handle_write_file(state: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_write_file().unwrap(); let filename = String::from(msg.filename().unwrap()); let data = msg.data().unwrap(); let perm = msg.perm(); - let isolate = from_c(i); - if !isolate.flags.allow_write { + if !state.flags.allow_write { return odd_future(permission_denied()); } + Box::new(futures::future::result(|| -> OpResult { debug!("handle_write_file {}", filename); deno_fs::write_file(Path::new(&filename), data, perm)?; - Ok(None) + Ok(empty_buf()) }())) } -fn remove_timer(i: *const isolate, timer_id: u32) { - let isolate = from_c(i); - isolate.timers.remove(&timer_id); +fn remove_timer(state: Arc, timer_id: u32) { + let mut timers = state.timers.lock().unwrap(); + timers.remove(&timer_id); } // Prototype: https://github.com/ry/isolate/blob/golang/timers.go#L25-L39 -fn handle_timer_start(i: *const isolate, base: &msg::Base) -> Box { +fn handle_timer_start(state: Arc, base: &msg::Base) -> Box { debug!("handle_timer_start"); let msg = base.msg_as_timer_start().unwrap(); let cmd_id = base.cmd_id(); let timer_id = msg.id(); let delay = msg.delay(); - let isolate = from_c(i); + let config2 = state.clone(); let future = { let (delay_task, cancel_delay) = set_timeout( move || { - remove_timer(i, timer_id); + remove_timer(config2, timer_id); }, delay, ); - isolate.timers.insert(timer_id, cancel_delay); + let mut timers = state.timers.lock().unwrap(); + timers.insert(timer_id, cancel_delay); delay_task }; - Box::new(future.then(move |result| { + let r = Box::new(future.then(move |result| { let builder = &mut FlatBufferBuilder::new(); let msg = msg::TimerReady::create( builder, @@ -657,20 +627,20 @@ fn handle_timer_start(i: *const isolate, base: &msg::Base) -> Box { ..Default::default() }, )) - })) + })); + r } // Prototype: https://github.com/ry/isolate/blob/golang/timers.go#L40-L43 -fn handle_timer_clear(i: *const isolate, base: &msg::Base) -> Box { +fn handle_timer_clear(state: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_timer_clear().unwrap(); debug!("handle_timer_clear"); - remove_timer(i, msg.id()); - ok_future(None) + remove_timer(state, msg.id()); + ok_future(empty_buf()) } -fn handle_rename(i: *const isolate, base: &msg::Base) -> Box { - let isolate = from_c(i); - if !isolate.flags.allow_write { +fn handle_rename(state: Arc, base: &msg::Base) -> Box { + if !state.flags.allow_write { return odd_future(permission_denied()); } let msg = base.msg_as_rename().unwrap(); @@ -679,13 +649,12 @@ fn handle_rename(i: *const isolate, base: &msg::Base) -> Box { Box::new(futures::future::result(|| -> OpResult { debug!("handle_rename {} {}", oldpath, newpath); fs::rename(Path::new(&oldpath), Path::new(&newpath))?; - Ok(None) + Ok(empty_buf()) }())) } -fn handle_symlink(i: *const isolate, base: &msg::Base) -> Box { - let deno = from_c(i); - if !deno.flags.allow_write { +fn handle_symlink(state: Arc, base: &msg::Base) -> Box { + if !state.flags.allow_write { return odd_future(permission_denied()); } // TODO Use type for Windows. @@ -699,12 +668,12 @@ fn handle_symlink(i: *const isolate, base: &msg::Base) -> Box { debug!("handle_symlink {} {}", oldname, newname); #[cfg(any(unix))] std::os::unix::fs::symlink(Path::new(&oldname), Path::new(&newname))?; - Ok(None) + Ok(empty_buf()) }())) } } -fn handle_read_link(_i: *const isolate, base: &msg::Base) -> Box { +fn handle_read_link(_state: Arc, base: &msg::Base) -> Box { let msg = base.msg_as_readlink().unwrap(); let cmd_id = base.cmd_id(); let name = String::from(msg.name().unwrap()); diff --git a/src/http.rs b/src/http.rs index 5907b35ed66cf00be6cb33fdce7acb6c1a5f7103..3b5ede10e3fef10161c9f6f45bc7acc2e4ced405 100644 --- a/src/http.rs +++ b/src/http.rs @@ -1,10 +1,12 @@ // Copyright 2018 the Deno authors. All rights reserved. MIT license. use errors::DenoResult; +use tokio_util; use futures::Future; use futures::Stream; use hyper; +use hyper::client::Client; use hyper::client::HttpConnector; use hyper::Uri; use hyper_rustls; @@ -29,21 +31,24 @@ pub fn get_client() -> Client { pub fn fetch_sync_string(module_name: &str) -> DenoResult { let url = module_name.parse::().unwrap(); let client = get_client(); - - // TODO Use Deno's RT - let mut rt = Runtime::new().unwrap(); - let body = rt.block_on( - client - .get(url) - .and_then(|response| response.into_body().concat2()), - )?; + let future = client + .get(url) + .and_then(|response| response.into_body().concat2()); + let body = tokio_util::block_on(future)?; Ok(String::from_utf8(body.to_vec()).unwrap()) } #[test] fn test_fetch_sync_string() { // Relies on external http server. See tools/http_server.py - let p = fetch_sync_string("http://localhost:4545/package.json").unwrap(); - println!("package.json len {}", p.len()); - assert!(p.len() > 1); + use futures; + + tokio_util::init(|| { + tokio_util::block_on(futures::future::lazy(|| -> DenoResult<()> { + let p = fetch_sync_string("http://127.0.0.1:4545/package.json")?; + println!("package.json len {}", p.len()); + assert!(p.len() > 1); + Ok(()) + })).unwrap(); + }); } diff --git a/src/isolate.rs b/src/isolate.rs index 5daf45701b72310ed5ab8dc88b4775bde769a69e..64bec5ddf11c9f811866f20934e0caf5796675ec 100644 --- a/src/isolate.rs +++ b/src/isolate.rs @@ -1,54 +1,113 @@ // Copyright 2018 the Deno authors. All rights reserved. MIT license. + +// Do not use FlatBuffers in this module. +// TODO Currently this module uses Tokio, but it would be nice if they were +// decoupled. + use deno_dir; +use errors::DenoError; use flags; +use libdeno; + use futures; -use handlers; +use futures::Future; use libc::c_void; -use libdeno; use std; use std::collections::HashMap; use std::ffi::CStr; use std::ffi::CString; +use std::sync::mpsc; +use std::sync::Arc; +use std::sync::Mutex; use tokio; +use tokio_util; type DenoException<'a> = &'a str; +// Buf represents a byte array returned from a "Op". +// The message might be empty (which will be translated into a null object on +// the javascript side) or it is a heap allocated opaque sequence of bytes. +// Usually a flatbuffer message. +pub type Buf = Box<[u8]>; + +// JS promises in Deno map onto a specific Future +// which yields either a DenoError or a byte array. +pub type Op = Future + Send; + +// Returns (is_sync, op) +pub type Dispatch = fn(state: Arc, buf: &[u8]) -> (bool, Box); + pub struct Isolate { - pub ptr: *const libdeno::isolate, + ptr: *const libdeno::isolate, + dispatch: Dispatch, + rx: mpsc::Receiver, + ntasks: i32, + pub state: Arc, +} + +// Isolate cannot be passed between threads but IsolateState can. So any state that +// needs to be accessed outside the main V8 thread should be inside IsolateState. +pub struct IsolateState { pub dir: deno_dir::DenoDir, - pub rt: tokio::runtime::current_thread::Runtime, - pub timers: HashMap>, + pub timers: Mutex>>, pub argv: Vec, pub flags: flags::DenoFlags, + tx: Mutex>>, +} + +impl IsolateState { + // Thread safe. + fn send_to_js(&self, buf: Buf) { + let mut g = self.tx.lock().unwrap(); + let maybe_tx = g.as_mut(); + assert!(maybe_tx.is_some(), "Expected tx to not be deleted."); + let tx = maybe_tx.unwrap(); + tx.send(buf).expect("tx.send error"); + } } static DENO_INIT: std::sync::Once = std::sync::ONCE_INIT; impl Isolate { - pub fn new(argv: Vec) -> Box { + pub fn new(argv: Vec, dispatch: Dispatch) -> Box { DENO_INIT.call_once(|| { unsafe { libdeno::deno_init() }; }); let (flags, argv_rest) = flags::set_flags(argv); - let mut deno_box = Box::new(Isolate { + // This channel handles sending async messages back to the runtime. + let (tx, rx) = mpsc::channel::(); + + let mut isolate = Box::new(Isolate { ptr: 0 as *const libdeno::isolate, - dir: deno_dir::DenoDir::new(flags.reload, None).unwrap(), - rt: tokio::runtime::current_thread::Runtime::new().unwrap(), - timers: HashMap::new(), - argv: argv_rest, - flags, + dispatch, + rx, + ntasks: 0, + state: Arc::new(IsolateState { + dir: deno_dir::DenoDir::new(flags.reload, None).unwrap(), + timers: Mutex::new(HashMap::new()), + argv: argv_rest, + flags, + tx: Mutex::new(Some(tx)), + }), }); - (*deno_box).ptr = unsafe { + (*isolate).ptr = unsafe { libdeno::deno_new( - deno_box.as_ref() as *const _ as *const c_void, - handlers::msg_from_js, + isolate.as_ref() as *const _ as *const c_void, + pre_dispatch, ) }; - deno_box + isolate + } + + pub fn from_c<'a>(d: *const libdeno::isolate) -> &'a mut Isolate { + let ptr = unsafe { libdeno::deno_get_data(d) }; + let ptr = ptr as *mut Isolate; + let isolate_box = unsafe { Box::from_raw(ptr) }; + Box::leak(isolate_box) } pub fn execute( @@ -68,6 +127,42 @@ impl Isolate { } Ok(()) } + + pub fn set_response(&self, buf: Buf) { + unsafe { libdeno::deno_set_response(self.ptr, buf.into()) } + } + + pub fn send(&self, buf: Buf) { + unsafe { libdeno::deno_send(self.ptr, buf.into()) }; + } + + // TODO Use Park abstraction? Note at time of writing Tokio default runtime + // does not have new_with_park(). + pub fn event_loop(&mut self) { + // Main thread event loop. + while !self.is_idle() { + let buf = self.rx.recv().unwrap(); + // Receiving a message on rx exactly corresponds to an async task + // completing. + self.ntasks_decrement(); + // Call into JS with the buf. + self.send(buf); + } + } + + fn ntasks_increment(&mut self) { + assert!(self.ntasks >= 0); + self.ntasks = self.ntasks + 1; + } + + fn ntasks_decrement(&mut self) { + self.ntasks = self.ntasks - 1; + assert!(self.ntasks >= 0); + } + + fn is_idle(&self) -> bool { + self.ntasks == 0 + } } impl Drop for Isolate { @@ -76,22 +171,107 @@ impl Drop for Isolate { } } -pub fn from_c<'a>(i: *const libdeno::isolate) -> &'a mut Isolate { - let ptr = unsafe { libdeno::deno_get_data(i) }; - let ptr = ptr as *mut Isolate; - let isolate_box = unsafe { Box::from_raw(ptr) }; - Box::leak(isolate_box) +/// Converts Rust Buf to libdeno deno_buf. +impl From for libdeno::deno_buf { + fn from(x: Buf) -> libdeno::deno_buf { + let len = x.len(); + let ptr = Box::into_raw(x); + libdeno::deno_buf { + alloc_ptr: 0 as *mut u8, + alloc_len: 0, + data_ptr: ptr as *mut u8, + data_len: len, + } + } } -#[test] -fn test_c_to_rust() { - let argv = vec![String::from("./deno"), String::from("hello.js")]; - let isolate = Isolate::new(argv); - let isolate2 = from_c(isolate.ptr); - assert_eq!(isolate.ptr, isolate2.ptr); - assert_eq!( - isolate.dir.root.join("gen"), - isolate.dir.gen, - "Sanity check" - ); +// Dereferences the C pointer into the Rust Isolate object. +extern "C" fn pre_dispatch(d: *const libdeno::isolate, buf: libdeno::deno_buf) { + let bytes = unsafe { std::slice::from_raw_parts(buf.data_ptr, buf.data_len) }; + let isolate = Isolate::from_c(d); + let dispatch = isolate.dispatch; + let (is_sync, op) = dispatch(isolate.state.clone(), bytes); + + if is_sync { + // Execute op synchronously. + let buf = tokio_util::block_on(op).unwrap(); + if buf.len() != 0 { + // Set the synchronous response, the value returned from isolate.send(). + isolate.set_response(buf); + } + } else { + // Execute op asynchronously. + let state = isolate.state.clone(); + + // TODO Ideally Tokio would could tell us how many tasks are executing, but + // it cannot currently. Therefore we track top-level promises/tasks + // manually. + isolate.ntasks_increment(); + + let task = op + .and_then(move |buf| { + state.send_to_js(buf); + Ok(()) + }).map_err(|_| ()); + tokio::spawn(task); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_c_to_rust() { + let argv = vec![String::from("./deno"), String::from("hello.js")]; + let isolate = Isolate::new(argv, unreachable_dispatch); + let isolate2 = Isolate::from_c(isolate.ptr); + assert_eq!(isolate.ptr, isolate2.ptr); + assert_eq!( + isolate.state.dir.root.join("gen"), + isolate.state.dir.gen, + "Sanity check" + ); + } + + fn unreachable_dispatch( + _state: Arc, + _buf: &[u8], + ) -> (bool, Box) { + unreachable!(); + } + + #[test] + fn test_dispatch_sync() { + let argv = vec![String::from("./deno"), String::from("hello.js")]; + let mut isolate = Isolate::new(argv, dispatch_sync); + tokio_util::init(|| { + isolate + .execute( + "y.js", + r#" + const m = new Uint8Array([4, 5, 6]); + let n = libdeno.send(m); + if (!(n.byteLength === 3 && + n[0] === 1 && + n[1] === 2 && + n[2] === 3)) { + throw Error("assert error"); + } + "#, + ).expect("execute error"); + isolate.event_loop(); + }); + } + + fn dispatch_sync(_state: Arc, buf: &[u8]) -> (bool, Box) { + assert_eq!(buf[0], 4); + assert_eq!(buf[1], 5); + assert_eq!(buf[2], 6); + // Send back some sync response. + let vec: Vec = vec![1, 2, 3]; + let buf = vec.into_boxed_slice(); + let op = Box::new(futures::future::ok(buf)); + (true, op) + } } diff --git a/src/main.rs b/src/main.rs index 1f9ead0191f0f38a680f2c938cd0a8516a3105e9..b60f8051378051ccd14d0b01d1888a6ef01ae226 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,7 @@ extern crate msg_rs as msg; extern crate rand; extern crate tempfile; extern crate tokio; +extern crate tokio_executor; extern crate url; #[macro_use] extern crate lazy_static; @@ -25,9 +26,9 @@ pub mod handlers; mod http; mod isolate; mod libdeno; +mod tokio_util; mod version; -use isolate::Isolate; use std::env; static LOGGER: Logger = Logger; @@ -49,18 +50,16 @@ impl log::Log for Logger { fn main() { log::set_logger(&LOGGER).unwrap(); - let args = env::args().collect(); - let mut isolate = Isolate::new(args); - flags::process(&isolate.flags); - - isolate - .execute("deno_main.js", "denoMain();") - .unwrap_or_else(|err| { - error!("{}", err); - std::process::exit(1); - }); - - // Start the Tokio event loop - isolate.rt.run().expect("err"); + let mut isolate = isolate::Isolate::new(args, handlers::msg_from_js); + flags::process(&isolate.state.flags); + tokio_util::init(|| { + isolate + .execute("deno_main.js", "denoMain();") + .unwrap_or_else(|err| { + error!("{}", err); + std::process::exit(1); + }); + isolate.event_loop(); + }); } diff --git a/src/tokio_util.rs b/src/tokio_util.rs new file mode 100644 index 0000000000000000000000000000000000000000..de81620ef7fa5d12ec800ca47abc6f9e875934f1 --- /dev/null +++ b/src/tokio_util.rs @@ -0,0 +1,30 @@ +// Copyright 2018 the Deno authors. All rights reserved. MIT license. + +use futures; +use futures::Future; +use tokio; +use tokio_executor; + +pub fn block_on(future: F) -> Result +where + F: Send + 'static + Future, + R: Send + 'static, + E: Send + 'static, +{ + let (tx, rx) = futures::sync::oneshot::channel(); + tokio::spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!()))); + rx.wait().unwrap() +} + +// Set the default executor so we can use tokio::spawn(). It's difficult to +// pass around mut references to the runtime, so using with_default is +// preferable. Ideally Tokio would provide this function. +pub fn init(f: F) +where + F: FnOnce(), +{ + let rt = tokio::runtime::Runtime::new().unwrap(); + let mut executor = rt.executor(); + let mut enter = tokio_executor::enter().expect("Multiple executors at once"); + tokio_executor::with_default(&mut executor, &mut enter, move |_enter| f()); +}