diff --git a/cli/lib.rs b/cli/lib.rs index 3d772bb83f6bf5f6bdd885bde45f454675bdc006..17ca94b555178ecbd54a522a4e4fb3ea3ea8bfc3 100644 --- a/cli/lib.rs +++ b/cli/lib.rs @@ -43,6 +43,7 @@ pub mod permissions; mod progress; mod repl; pub mod resolve_addr; +pub mod resources; mod shell; mod signal; pub mod source_maps; @@ -56,7 +57,6 @@ pub mod worker; use crate::deno_error::js_check; use crate::deno_error::print_err_and_exit; use crate::global_state::ThreadSafeGlobalState; -use crate::ops::io::get_stdio; use crate::progress::Progress; use crate::state::ThreadSafeState; use crate::worker::Worker; @@ -128,15 +128,6 @@ fn create_worker_and_state( .map_err(deno_error::print_err_and_exit) .unwrap(); - let state_ = state.clone(); - { - let mut resource_table = state_.lock_resource_table(); - let (stdin, stdout, stderr) = get_stdio(); - resource_table.add("stdin", Box::new(stdin)); - resource_table.add("stdout", Box::new(stdout)); - resource_table.add("stderr", Box::new(stderr)); - } - let worker = Worker::new( "main".to_string(), startup_data::deno_isolate_init(), diff --git a/cli/ops/dispatch_minimal.rs b/cli/ops/dispatch_minimal.rs index 355a2463406bd359453e31393484e91c889a261f..c19521bf128136c4d4f6c68316d8769e67f6c828 100644 --- a/cli/ops/dispatch_minimal.rs +++ b/cli/ops/dispatch_minimal.rs @@ -15,6 +15,7 @@ use deno::PinnedBuf; use futures::Future; pub type MinimalOp = dyn Future + Send; +pub type Dispatcher = fn(i32, Option) -> Box; #[derive(Copy, Clone, Debug, PartialEq)] // This corresponds to RecordMinimal on the TS side. @@ -111,10 +112,9 @@ fn test_parse_min_record() { assert_eq!(parse_min_record(&buf), None); } -pub fn minimal_op(d: D) -> impl Fn(&[u8], Option) -> CoreOp -where - D: Fn(i32, Option) -> Box, -{ +pub fn minimal_op( + d: Dispatcher, +) -> impl Fn(&[u8], Option) -> CoreOp { move |control: &[u8], zero_copy: Option| { let mut record = match parse_min_record(control) { Some(r) => r, diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs index a1c0fe29cb139b32a897dca4f44854171a3d8b6d..14333117100b9961fe1004eb47f9e734a0179394 100644 --- a/cli/ops/fetch.rs +++ b/cli/ops/fetch.rs @@ -1,9 +1,8 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; -use super::io::StreamResource; -use crate::http_body::HttpBody; use crate::http_util::get_client; use crate::ops::json_op; +use crate::resources; use crate::state::ThreadSafeState; use deno::*; use http::header::HeaderName; @@ -55,7 +54,6 @@ pub fn op_fetch( request = request.header(name, v); } debug!("Before fetch {}", url); - let state_ = state.clone(); let future = request.send().map_err(ErrBox::from).and_then(move |res| { let status = res.status(); let mut res_headers = Vec::new(); @@ -63,9 +61,8 @@ pub fn op_fetch( res_headers.push((key.to_string(), val.to_str().unwrap().to_owned())); } - let body = HttpBody::from(res.into_body()); - let mut table = state_.lock_resource_table(); - let rid = table.add("httpBody", Box::new(StreamResource::HttpBody(body))); + let body = res.into_body(); + let rid = resources::add_reqwest_body(body); let json_res = json!({ "bodyRid": rid, diff --git a/cli/ops/files.rs b/cli/ops/files.rs index fc1b8e7d8cfe00de2ac0e3088e6974a33d321516..04b5f98bfd269a42318d36af4db2b328f5292652 100644 --- a/cli/ops/files.rs +++ b/cli/ops/files.rs @@ -1,11 +1,12 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; -use super::io::StreamResource; use crate::deno_error::bad_resource; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; use crate::fs as deno_fs; use crate::ops::json_op; +use crate::resources; +use crate::resources::CliResource; use crate::state::ThreadSafeState; use deno::*; use futures::Future; @@ -37,7 +38,7 @@ fn op_open( let args: OpenArgs = serde_json::from_value(args)?; let (filename, filename_) = deno_fs::resolve_from_cwd(&args.filename)?; let mode = args.mode.as_ref(); - let state_ = state.clone(); + let mut open_options = tokio::fs::OpenOptions::new(); match mode { @@ -90,8 +91,7 @@ fn op_open( let is_sync = args.promise_id.is_none(); let op = open_options.open(filename).map_err(ErrBox::from).and_then( move |fs_file| { - let mut table = state_.lock_resource_table(); - let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file))); + let rid = resources::add_fs_file(fs_file); futures::future::ok(json!(rid)) }, ); @@ -110,21 +110,21 @@ struct CloseArgs { } fn op_close( - state: &ThreadSafeState, + _state: &ThreadSafeState, args: Value, _zero_copy: Option, ) -> Result { let args: CloseArgs = serde_json::from_value(args)?; - let mut table = state.lock_resource_table(); + let mut table = resources::lock_resource_table(); table.close(args.rid as u32).ok_or_else(bad_resource)?; Ok(JsonOp::Sync(json!({}))) } +#[derive(Debug)] pub struct SeekFuture { seek_from: SeekFrom, rid: ResourceId, - state: ThreadSafeState, } impl Future for SeekFuture { @@ -132,13 +132,13 @@ impl Future for SeekFuture { type Error = ErrBox; fn poll(&mut self) -> Poll { - let mut table = self.state.lock_resource_table(); + let mut table = resources::lock_resource_table(); let resource = table - .get_mut::(self.rid) + .get_mut::(self.rid) .ok_or_else(bad_resource)?; let tokio_file = match resource { - StreamResource::FsFile(ref mut file) => file, + CliResource::FsFile(ref mut file) => file, _ => return Err(bad_resource()), }; @@ -156,7 +156,7 @@ struct SeekArgs { } fn op_seek( - state: &ThreadSafeState, + _state: &ThreadSafeState, args: Value, _zero_copy: Option, ) -> Result { @@ -177,11 +177,7 @@ fn op_seek( } }; - let fut = SeekFuture { - state: state.clone(), - seek_from, - rid, - }; + let fut = SeekFuture { seek_from, rid }; let op = fut.and_then(move |_| futures::future::ok(json!({}))); if args.promise_id.is_none() { diff --git a/cli/ops/io.rs b/cli/ops/io.rs index 959147f192a699462e7f2bb5d5a8da37b5d32ced..3ede4b4112842afbb5d1b31e95277f27115d3681 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -1,101 +1,19 @@ use super::dispatch_minimal::MinimalOp; use crate::deno_error; use crate::deno_error::bad_resource; -use crate::http_body::HttpBody; use crate::ops::minimal_op; +use crate::resources; +use crate::resources::CliResource; +use crate::resources::DenoAsyncRead; +use crate::resources::DenoAsyncWrite; use crate::state::ThreadSafeState; -use deno::ErrBox; -use deno::Resource; use deno::*; -use futures; use futures::Future; use futures::Poll; -use std; -use tokio; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::net::TcpStream; -use tokio_process; -use tokio_rustls::client::TlsStream as ClientTlsStream; -use tokio_rustls::server::TlsStream as ServerTlsStream; - -#[cfg(not(windows))] -use std::os::unix::io::FromRawFd; - -#[cfg(windows)] -use std::os::windows::io::FromRawHandle; - -#[cfg(windows)] -extern crate winapi; pub fn init(i: &mut Isolate, s: &ThreadSafeState) { - i.register_op( - "read", - s.core_op(minimal_op(s.stateful_minimal_op(op_read))), - ); - i.register_op( - "write", - s.core_op(minimal_op(s.stateful_minimal_op(op_write))), - ); -} - -pub fn get_stdio() -> (StreamResource, StreamResource, StreamResource) { - let stdin = StreamResource::Stdin(tokio::io::stdin()); - let stdout = StreamResource::Stdout({ - #[cfg(not(windows))] - let stdout = unsafe { std::fs::File::from_raw_fd(1) }; - #[cfg(windows)] - let stdout = unsafe { - std::fs::File::from_raw_handle(winapi::um::processenv::GetStdHandle( - winapi::um::winbase::STD_OUTPUT_HANDLE, - )) - }; - tokio::fs::File::from_std(stdout) - }); - let stderr = StreamResource::Stderr(tokio::io::stderr()); - - (stdin, stdout, stderr) -} - -pub enum StreamResource { - Stdin(tokio::io::Stdin), - Stdout(tokio::fs::File), - Stderr(tokio::io::Stderr), - FsFile(tokio::fs::File), - TcpStream(tokio::net::TcpStream), - ServerTlsStream(Box>), - ClientTlsStream(Box>), - HttpBody(HttpBody), - ChildStdin(tokio_process::ChildStdin), - ChildStdout(tokio_process::ChildStdout), - ChildStderr(tokio_process::ChildStderr), -} - -impl Resource for StreamResource {} - -/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait -/// but uses an `ErrBox` error instead of `std::io:Error` -pub trait DenoAsyncRead { - fn poll_read(&mut self, buf: &mut [u8]) -> Poll; -} - -impl DenoAsyncRead for StreamResource { - fn poll_read(&mut self, buf: &mut [u8]) -> Poll { - let r = match self { - StreamResource::FsFile(ref mut f) => f.poll_read(buf), - StreamResource::Stdin(ref mut f) => f.poll_read(buf), - StreamResource::TcpStream(ref mut f) => f.poll_read(buf), - StreamResource::ClientTlsStream(ref mut f) => f.poll_read(buf), - StreamResource::ServerTlsStream(ref mut f) => f.poll_read(buf), - StreamResource::HttpBody(ref mut f) => f.poll_read(buf), - StreamResource::ChildStdout(ref mut f) => f.poll_read(buf), - StreamResource::ChildStderr(ref mut f) => f.poll_read(buf), - _ => { - return Err(bad_resource()); - } - }; - - r.map_err(ErrBox::from) - } + i.register_op("read", s.core_op(minimal_op(op_read))); + i.register_op("write", s.core_op(minimal_op(op_write))); } #[derive(Debug, PartialEq)] @@ -109,15 +27,14 @@ enum IoState { /// /// The returned future will resolve to both the I/O stream and the buffer /// as well as the number of bytes read once the read operation is completed. -pub fn read(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Read +pub fn read(rid: ResourceId, buf: T) -> Read where T: AsMut<[u8]>, { Read { rid, buf, - io_state: IoState::Pending, - state: state.clone(), + state: IoState::Pending, } } @@ -125,11 +42,11 @@ where /// a buffer. /// /// Created by the [`read`] function. +#[derive(Debug)] pub struct Read { rid: ResourceId, buf: T, - io_state: IoState, - state: ThreadSafeState, + state: IoState, } impl Future for Read @@ -140,25 +57,21 @@ where type Error = ErrBox; fn poll(&mut self) -> Poll { - if self.io_state == IoState::Done { + if self.state == IoState::Done { panic!("poll a Read after it's done"); } - let mut table = self.state.lock_resource_table(); + let mut table = resources::lock_resource_table(); let resource = table - .get_mut::(self.rid) + .get_mut::(self.rid) .ok_or_else(bad_resource)?; let nread = try_ready!(resource.poll_read(&mut self.buf.as_mut()[..])); - self.io_state = IoState::Done; + self.state = IoState::Done; Ok(nread.into()) } } -pub fn op_read( - state: &ThreadSafeState, - rid: i32, - zero_copy: Option, -) -> Box { +pub fn op_read(rid: i32, zero_copy: Option) -> Box { debug!("read rid={}", rid); let zero_copy = match zero_copy { None => { @@ -167,50 +80,19 @@ pub fn op_read( Some(buf) => buf, }; - let fut = read(state, rid as u32, zero_copy) + let fut = read(rid as u32, zero_copy) .map_err(ErrBox::from) .and_then(move |nread| Ok(nread as i32)); Box::new(fut) } -/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait -/// but uses an `ErrBox` error instead of `std::io:Error` -pub trait DenoAsyncWrite { - fn poll_write(&mut self, buf: &[u8]) -> Poll; - - fn shutdown(&mut self) -> Poll<(), ErrBox>; -} - -impl DenoAsyncWrite for StreamResource { - fn poll_write(&mut self, buf: &[u8]) -> Poll { - let r = match self { - StreamResource::FsFile(ref mut f) => f.poll_write(buf), - StreamResource::Stdout(ref mut f) => f.poll_write(buf), - StreamResource::Stderr(ref mut f) => f.poll_write(buf), - StreamResource::TcpStream(ref mut f) => f.poll_write(buf), - StreamResource::ClientTlsStream(ref mut f) => f.poll_write(buf), - StreamResource::ServerTlsStream(ref mut f) => f.poll_write(buf), - StreamResource::ChildStdin(ref mut f) => f.poll_write(buf), - _ => { - return Err(bad_resource()); - } - }; - - r.map_err(ErrBox::from) - } - - fn shutdown(&mut self) -> futures::Poll<(), ErrBox> { - unimplemented!() - } -} - /// A future used to write some data to a stream. +#[derive(Debug)] pub struct Write { rid: ResourceId, buf: T, - io_state: IoState, - state: ThreadSafeState, + state: IoState, } /// Creates a future that will write some of the buffer `buf` to @@ -218,15 +100,14 @@ pub struct Write { /// /// Any error which happens during writing will cause both the stream and the /// buffer to get destroyed. -pub fn write(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Write +pub fn write(rid: ResourceId, buf: T) -> Write where T: AsRef<[u8]>, { Write { rid, buf, - io_state: IoState::Pending, - state: state.clone(), + state: IoState::Pending, } } @@ -240,25 +121,21 @@ where type Error = ErrBox; fn poll(&mut self) -> Poll { - if self.io_state == IoState::Done { + if self.state == IoState::Done { panic!("poll a Read after it's done"); } - let mut table = self.state.lock_resource_table(); + let mut table = resources::lock_resource_table(); let resource = table - .get_mut::(self.rid) + .get_mut::(self.rid) .ok_or_else(bad_resource)?; let nwritten = try_ready!(resource.poll_write(self.buf.as_ref())); - self.io_state = IoState::Done; + self.state = IoState::Done; Ok(nwritten.into()) } } -pub fn op_write( - state: &ThreadSafeState, - rid: i32, - zero_copy: Option, -) -> Box { +pub fn op_write(rid: i32, zero_copy: Option) -> Box { debug!("write rid={}", rid); let zero_copy = match zero_copy { None => { @@ -267,7 +144,7 @@ pub fn op_write( Some(buf) => buf, }; - let fut = write(state, rid as u32, zero_copy) + let fut = write(rid as u32, zero_copy) .map_err(ErrBox::from) .and_then(move |nwritten| Ok(nwritten as i32)); diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs index 9b33d59189ec72599b4211800c1efcd7f5a2d67c..4e6eb37c815628920aad50e987f0680cae286d35 100644 --- a/cli/ops/mod.rs +++ b/cli/ops/mod.rs @@ -5,7 +5,6 @@ mod dispatch_minimal; pub use dispatch_json::json_op; pub use dispatch_json::JsonOp; pub use dispatch_minimal::minimal_op; -pub use dispatch_minimal::MinimalOp; pub mod compiler; pub mod errors; diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 2fe81e1403bd985fc3b41f33f056c3d4324b0c1b..a4b3bf934ee2b983719d63655be323e7c17dae7f 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -1,11 +1,12 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; -use super::io::StreamResource; use crate::deno_error::bad_resource; use crate::ops::json_op; use crate::resolve_addr::resolve_addr; +use crate::resources; +use crate::resources::CliResource; +use crate::resources::Resource; use crate::state::ThreadSafeState; -use deno::Resource; use deno::*; use futures::Async; use futures::Future; @@ -33,19 +34,18 @@ enum AcceptState { } /// Simply accepts a connection. -pub fn accept(state: &ThreadSafeState, rid: ResourceId) -> Accept { +pub fn accept(rid: ResourceId) -> Accept { Accept { - accept_state: AcceptState::Eager, + state: AcceptState::Eager, rid, - state: state.clone(), } } /// A future representing state of accepting a TCP connection. +#[derive(Debug)] pub struct Accept { - accept_state: AcceptState, + state: AcceptState, rid: ResourceId, - state: ThreadSafeState, } impl Future for Accept { @@ -53,11 +53,11 @@ impl Future for Accept { type Error = ErrBox; fn poll(&mut self) -> Poll { - if self.accept_state == AcceptState::Done { + if self.state == AcceptState::Done { panic!("poll Accept after it's done"); } - let mut table = self.state.lock_resource_table(); + let mut table = resources::lock_resource_table(); let listener_resource = table .get_mut::(self.rid) .ok_or_else(|| { @@ -70,22 +70,22 @@ impl Future for Accept { let listener = &mut listener_resource.listener; - if self.accept_state == AcceptState::Eager { + if self.state == AcceptState::Eager { // Similar to try_ready!, but also track/untrack accept task // in TcpListener resource. // In this way, when the listener is closed, the task can be // notified to error out (instead of stuck forever). match listener.poll_accept().map_err(ErrBox::from) { Ok(Async::Ready((stream, addr))) => { - self.accept_state = AcceptState::Done; + self.state = AcceptState::Done; return Ok((stream, addr).into()); } Ok(Async::NotReady) => { - self.accept_state = AcceptState::Pending; + self.state = AcceptState::Pending; return Ok(Async::NotReady); } Err(e) => { - self.accept_state = AcceptState::Done; + self.state = AcceptState::Done; return Err(e); } } @@ -94,7 +94,7 @@ impl Future for Accept { match listener.poll_accept().map_err(ErrBox::from) { Ok(Async::Ready((stream, addr))) => { listener_resource.untrack_task(); - self.accept_state = AcceptState::Done; + self.state = AcceptState::Done; Ok((stream, addr).into()) } Ok(Async::NotReady) => { @@ -103,7 +103,7 @@ impl Future for Accept { } Err(e) => { listener_resource.untrack_task(); - self.accept_state = AcceptState::Done; + self.state = AcceptState::Done; Err(e) } } @@ -116,25 +116,23 @@ struct AcceptArgs { } fn op_accept( - state: &ThreadSafeState, + _state: &ThreadSafeState, args: Value, _zero_copy: Option, ) -> Result { let args: AcceptArgs = serde_json::from_value(args)?; let rid = args.rid as u32; - let state_ = state.clone(); - let table = state.lock_resource_table(); + + let table = resources::lock_resource_table(); table .get::(rid) .ok_or_else(bad_resource)?; - let op = accept(state, rid) + let op = accept(rid) .and_then(move |(tcp_stream, _socket_addr)| { let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; - let mut table = state_.lock_resource_table(); - let rid = - table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); + let rid = resources::add_tcp_stream(tcp_stream); Ok((rid, local_addr, remote_addr)) }) .map_err(ErrBox::from) @@ -163,7 +161,7 @@ fn op_dial( ) -> Result { let args: DialArgs = serde_json::from_value(args)?; assert_eq!(args.transport, "tcp"); // TODO Support others. - let state_ = state.clone(); + state.check_net(&args.hostname, args.port)?; let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| { @@ -172,9 +170,7 @@ fn op_dial( .and_then(move |tcp_stream| { let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; - let mut table = state_.lock_resource_table(); - let rid = table - .add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); + let rid = resources::add_tcp_stream(tcp_stream); Ok((rid, local_addr, remote_addr)) }) .map_err(ErrBox::from) @@ -197,7 +193,7 @@ struct ShutdownArgs { } fn op_shutdown( - state: &ThreadSafeState, + _state: &ThreadSafeState, args: Value, _zero_copy: Option, ) -> Result { @@ -212,12 +208,10 @@ fn op_shutdown( _ => unimplemented!(), }; - let mut table = state.lock_resource_table(); - let resource = table - .get_mut::(rid) - .ok_or_else(bad_resource)?; + let mut table = resources::lock_resource_table(); + let resource = table.get_mut::(rid).ok_or_else(bad_resource)?; match resource { - StreamResource::TcpStream(ref mut stream) => { + CliResource::TcpStream(ref mut stream) => { TcpStream::shutdown(stream, shutdown_mode).map_err(ErrBox::from)?; } _ => return Err(bad_resource()), @@ -305,7 +299,7 @@ fn op_listen( task: None, local_addr, }; - let mut table = state.lock_resource_table(); + let mut table = resources::lock_resource_table(); let rid = table.add("tcpListener", Box::new(listener_resource)); Ok(JsonOp::Sync(json!({ diff --git a/cli/ops/process.rs b/cli/ops/process.rs index 237b02fd05f7fd162addab073fd57f3085f6040f..f7897ec519273e24843382b724c3897b2dcd7823 100644 --- a/cli/ops/process.rs +++ b/cli/ops/process.rs @@ -1,8 +1,9 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; -use super::io::StreamResource; use crate::deno_error::bad_resource; use crate::ops::json_op; +use crate::resources; +use crate::resources::CloneFileFuture; use crate::signal::kill; use crate::state::ThreadSafeState; use deno::*; @@ -27,41 +28,6 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { i.register_op("kill", s.core_op(json_op(s.stateful_op(op_kill)))); } -struct CloneFileFuture { - rid: ResourceId, - state: ThreadSafeState, -} - -impl Future for CloneFileFuture { - type Item = tokio::fs::File; - type Error = ErrBox; - - fn poll(&mut self) -> Poll { - let mut table = self.state.lock_resource_table(); - let repr = table - .get_mut::(self.rid) - .ok_or_else(bad_resource)?; - match repr { - StreamResource::FsFile(ref mut file) => { - file.poll_try_clone().map_err(ErrBox::from) - } - _ => Err(bad_resource()), - } - } -} - -fn clone_file( - rid: u32, - state: &ThreadSafeState, -) -> Result { - (CloneFileFuture { - rid, - state: state.clone(), - }) - .wait() - .map(|f| f.into_std()) -} - fn subprocess_stdio_map(s: &str) -> std::process::Stdio { match s { "inherit" => std::process::Stdio::inherit(), @@ -99,7 +65,6 @@ fn op_run( let run_args: RunArgs = serde_json::from_value(args)?; state.check_run()?; - let state_ = state.clone(); let args = run_args.args; let env = run_args.env; @@ -118,7 +83,7 @@ fn op_run( // TODO: make this work with other resources, eg. sockets let stdin_rid = run_args.stdin_rid; if stdin_rid > 0 { - let file = clone_file(stdin_rid, &state_)?; + let file = (CloneFileFuture { rid: stdin_rid }).wait()?.into_std(); c.stdin(file); } else { c.stdin(subprocess_stdio_map(run_args.stdin.as_ref())); @@ -126,7 +91,7 @@ fn op_run( let stdout_rid = run_args.stdout_rid; if stdout_rid > 0 { - let file = clone_file(stdout_rid, &state_)?; + let file = (CloneFileFuture { rid: stdout_rid }).wait()?.into_std(); c.stdout(file); } else { c.stdout(subprocess_stdio_map(run_args.stdout.as_ref())); @@ -134,7 +99,7 @@ fn op_run( let stderr_rid = run_args.stderr_rid; if stderr_rid > 0 { - let file = clone_file(stderr_rid, &state_)?; + let file = (CloneFileFuture { rid: stderr_rid }).wait()?.into_std(); c.stderr(file); } else { c.stderr(subprocess_stdio_map(run_args.stderr.as_ref())); @@ -144,42 +109,29 @@ fn op_run( let mut child = c.spawn_async().map_err(ErrBox::from)?; let pid = child.id(); - let mut table = state_.lock_resource_table(); - - let stdin_rid = match child.stdin().take() { - Some(child_stdin) => { - let rid = table.add( - "childStdin", - Box::new(StreamResource::ChildStdin(child_stdin)), - ); - Some(rid) - } - None => None, + let stdin_rid = if child.stdin().is_some() { + let rid = resources::add_child_stdin(child.stdin().take().unwrap()); + Some(rid) + } else { + None }; - let stdout_rid = match child.stdout().take() { - Some(child_stdout) => { - let rid = table.add( - "childStdout", - Box::new(StreamResource::ChildStdout(child_stdout)), - ); - Some(rid) - } - None => None, + let stdout_rid = if child.stdout().is_some() { + let rid = resources::add_child_stdout(child.stdout().take().unwrap()); + Some(rid) + } else { + None }; - let stderr_rid = match child.stderr().take() { - Some(child_stderr) => { - let rid = table.add( - "childStderr", - Box::new(StreamResource::ChildStderr(child_stderr)), - ); - Some(rid) - } - None => None, + let stderr_rid = if child.stderr().is_some() { + let rid = resources::add_child_stderr(child.stderr().take().unwrap()); + Some(rid) + } else { + None }; let child_resource = ChildResource { child }; + let mut table = resources::lock_resource_table(); let child_rid = table.add("child", Box::new(child_resource)); Ok(JsonOp::Sync(json!({ @@ -193,7 +145,6 @@ fn op_run( pub struct ChildStatus { rid: ResourceId, - state: ThreadSafeState, } impl Future for ChildStatus { @@ -201,7 +152,7 @@ impl Future for ChildStatus { type Error = ErrBox; fn poll(&mut self) -> Poll { - let mut table = self.state.lock_resource_table(); + let mut table = resources::lock_resource_table(); let child_resource = table .get_mut::(self.rid) .ok_or_else(bad_resource)?; @@ -226,10 +177,7 @@ fn op_run_status( state.check_run()?; - let future = ChildStatus { - rid, - state: state.clone(), - }; + let future = ChildStatus { rid }; let future = future.and_then(move |run_status| { let code = run_status.code(); diff --git a/cli/ops/repl.rs b/cli/ops/repl.rs index 5919ea586be7b7638754a2c3ea4a74f7805feceb..723fb2571881d5fb270340a253274922c933a782 100644 --- a/cli/ops/repl.rs +++ b/cli/ops/repl.rs @@ -4,8 +4,9 @@ use crate::deno_error::bad_resource; use crate::ops::json_op; use crate::repl; use crate::repl::Repl; +use crate::resources; +use crate::resources::Resource; use crate::state::ThreadSafeState; -use deno::Resource; use deno::*; use std::sync::Arc; use std::sync::Mutex; @@ -43,7 +44,7 @@ fn op_repl_start( repl::history_path(&state.global_state.dir, &args.history_file); let repl = repl::Repl::new(history_path); let resource = ReplResource(Arc::new(Mutex::new(repl))); - let mut table = state.lock_resource_table(); + let mut table = resources::lock_resource_table(); let rid = table.add("repl", Box::new(resource)); Ok(JsonOp::Sync(json!(rid))) } @@ -55,7 +56,7 @@ struct ReplReadlineArgs { } fn op_repl_readline( - state: &ThreadSafeState, + _state: &ThreadSafeState, args: Value, _zero_copy: Option, ) -> Result { @@ -63,10 +64,9 @@ fn op_repl_readline( let rid = args.rid as u32; let prompt = args.prompt; debug!("op_repl_readline {} {}", rid, prompt); - let state = state.clone(); blocking_json(false, move || { - let table = state.lock_resource_table(); + let table = resources::lock_resource_table(); let resource = table.get::(rid).ok_or_else(bad_resource)?; let repl = resource.0.clone(); let line = repl.lock().unwrap().readline(&prompt)?; diff --git a/cli/ops/resources.rs b/cli/ops/resources.rs index c35e9762c8e47df7989e00f361a40b94cee80ab0..d92c6a83c341713f9a6c070c8613943c046c45af 100644 --- a/cli/ops/resources.rs +++ b/cli/ops/resources.rs @@ -1,6 +1,7 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{JsonOp, Value}; use crate::ops::json_op; +use crate::resources::lock_resource_table; use crate::state::ThreadSafeState; use deno::*; @@ -9,11 +10,11 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { } fn op_resources( - state: &ThreadSafeState, + _state: &ThreadSafeState, _args: Value, _zero_copy: Option, ) -> Result { - let resource_table = state.lock_resource_table(); + let resource_table = lock_resource_table(); let serialized_resources = resource_table.entries(); Ok(JsonOp::Sync(json!(serialized_resources))) } diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs index 48419f76f16c7756cad89e75488073da58889641..6e8348c915e1bfa4edeb1973303375fa7591123b 100644 --- a/cli/ops/tls.rs +++ b/cli/ops/tls.rs @@ -1,13 +1,13 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; -use super::io::StreamResource; use crate::deno_error::bad_resource; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; use crate::ops::json_op; use crate::resolve_addr::resolve_addr; +use crate::resources; +use crate::resources::Resource; use crate::state::ThreadSafeState; -use deno::Resource; use deno::*; use futures::Async; use futures::Future; @@ -60,7 +60,7 @@ pub fn op_dial_tls( ) -> Result { let args: DialTLSArgs = serde_json::from_value(args)?; let cert_file = args.cert_file; - let state_ = state.clone(); + state.check_net(&args.hostname, args.port)?; if let Some(path) = cert_file.clone() { state.check_read(&path)?; @@ -99,11 +99,7 @@ pub fn op_dial_tls( .connect(dnsname, tcp_stream) .map_err(ErrBox::from) .and_then(move |tls_stream| { - let mut table = state_.lock_resource_table(); - let rid = table.add( - "clientTlsStream", - Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))), - ); + let rid = resources::add_tls_stream(tls_stream); futures::future::ok(json!({ "rid": rid, "localAddr": local_addr.to_string(), @@ -269,7 +265,7 @@ fn op_listen_tls( task: None, local_addr, }; - let mut table = state.lock_resource_table(); + let mut table = resources::lock_resource_table(); let rid = table.add("tlsListener", Box::new(tls_listener_resource)); Ok(JsonOp::Sync(json!({ @@ -286,19 +282,18 @@ enum AcceptTlsState { } /// Simply accepts a TLS connection. -pub fn accept_tls(state: &ThreadSafeState, rid: ResourceId) -> AcceptTls { +pub fn accept_tls(rid: ResourceId) -> AcceptTls { AcceptTls { - accept_state: AcceptTlsState::Eager, + state: AcceptTlsState::Eager, rid, - state: state.clone(), } } /// A future representing state of accepting a TLS connection. +#[derive(Debug)] pub struct AcceptTls { - accept_state: AcceptTlsState, + state: AcceptTlsState, rid: ResourceId, - state: ThreadSafeState, } impl Future for AcceptTls { @@ -306,11 +301,11 @@ impl Future for AcceptTls { type Error = ErrBox; fn poll(&mut self) -> Poll { - if self.accept_state == AcceptTlsState::Done { + if self.state == AcceptTlsState::Done { panic!("poll AcceptTls after it's done"); } - let mut table = self.state.lock_resource_table(); + let mut table = resources::lock_resource_table(); let listener_resource = table .get_mut::(self.rid) .ok_or_else(|| { @@ -323,22 +318,22 @@ impl Future for AcceptTls { let listener = &mut listener_resource.listener; - if self.accept_state == AcceptTlsState::Eager { + if self.state == AcceptTlsState::Eager { // Similar to try_ready!, but also track/untrack accept task // in TcpListener resource. // In this way, when the listener is closed, the task can be // notified to error out (instead of stuck forever). match listener.poll_accept().map_err(ErrBox::from) { Ok(Async::Ready((stream, addr))) => { - self.accept_state = AcceptTlsState::Done; + self.state = AcceptTlsState::Done; return Ok((stream, addr).into()); } Ok(Async::NotReady) => { - self.accept_state = AcceptTlsState::Pending; + self.state = AcceptTlsState::Pending; return Ok(Async::NotReady); } Err(e) => { - self.accept_state = AcceptTlsState::Done; + self.state = AcceptTlsState::Done; return Err(e); } } @@ -347,7 +342,7 @@ impl Future for AcceptTls { match listener.poll_accept().map_err(ErrBox::from) { Ok(Async::Ready((stream, addr))) => { listener_resource.untrack_task(); - self.accept_state = AcceptTlsState::Done; + self.state = AcceptTlsState::Done; Ok((stream, addr).into()) } Ok(Async::NotReady) => { @@ -356,7 +351,7 @@ impl Future for AcceptTls { } Err(e) => { listener_resource.untrack_task(); - self.accept_state = AcceptTlsState::Done; + self.state = AcceptTlsState::Done; Err(e) } } @@ -369,22 +364,21 @@ struct AcceptTlsArgs { } fn op_accept_tls( - state: &ThreadSafeState, + _state: &ThreadSafeState, args: Value, _zero_copy: Option, ) -> Result { let args: AcceptTlsArgs = serde_json::from_value(args)?; let rid = args.rid as u32; - let state1 = state.clone(); - let state2 = state.clone(); - let op = accept_tls(state, rid) + + let op = accept_tls(rid) .and_then(move |(tcp_stream, _socket_addr)| { let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; Ok((tcp_stream, local_addr, remote_addr)) }) .and_then(move |(tcp_stream, local_addr, remote_addr)| { - let table = state1.lock_resource_table(); + let table = resources::lock_resource_table(); let resource = table .get::(rid) .ok_or_else(bad_resource) @@ -395,11 +389,7 @@ fn op_accept_tls( .accept(tcp_stream) .map_err(ErrBox::from) .and_then(move |tls_stream| { - let mut table = state2.lock_resource_table(); - let rid = table.add( - "serverTlsStream", - Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))), - ); + let rid = resources::add_server_tls_stream(tls_stream); Ok((rid, local_addr, remote_addr)) }) }) diff --git a/cli/resources.rs b/cli/resources.rs new file mode 100644 index 0000000000000000000000000000000000000000..db9b43eeb114bd9b11cee1aad631d383d37cc60c --- /dev/null +++ b/cli/resources.rs @@ -0,0 +1,209 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. + +// Think of Resources as File Descriptors. They are integers that are allocated +// by the privileged side of Deno to refer to various resources. The simplest +// example are standard file system files and stdio - but there will be other +// resources added in the future that might not correspond to operating system +// level File Descriptors. To avoid confusion we call them "resources" not "file +// descriptors". This module implements a global resource table. Ops (AKA +// handlers) look up resources by their integer id here. + +use crate::deno_error::bad_resource; +use crate::http_body::HttpBody; +use deno::ErrBox; +pub use deno::Resource; +pub use deno::ResourceId; +use deno::ResourceTable; + +use futures; +use futures::Future; +use futures::Poll; +use reqwest::r#async::Decoder as ReqwestDecoder; +use std; +use std::sync::Mutex; +use std::sync::MutexGuard; +use tokio; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::TcpStream; +use tokio_process; +use tokio_rustls::client::TlsStream as ClientTlsStream; +use tokio_rustls::server::TlsStream as ServerTlsStream; + +#[cfg(not(windows))] +use std::os::unix::io::FromRawFd; + +#[cfg(windows)] +use std::os::windows::io::FromRawHandle; + +#[cfg(windows)] +extern crate winapi; + +lazy_static! { + static ref RESOURCE_TABLE: Mutex = Mutex::new({ + let mut table = ResourceTable::default(); + + // TODO Load these lazily during lookup? + table.add("stdin", Box::new(CliResource::Stdin(tokio::io::stdin()))); + + table.add("stdout", Box::new(CliResource::Stdout({ + #[cfg(not(windows))] + let stdout = unsafe { std::fs::File::from_raw_fd(1) }; + #[cfg(windows)] + let stdout = unsafe { + std::fs::File::from_raw_handle(winapi::um::processenv::GetStdHandle( + winapi::um::winbase::STD_OUTPUT_HANDLE)) + }; + tokio::fs::File::from_std(stdout) + }))); + + table.add("stderr", Box::new(CliResource::Stderr(tokio::io::stderr()))); + table + }); +} + +// TODO: rename to `StreamResource` +pub enum CliResource { + Stdin(tokio::io::Stdin), + Stdout(tokio::fs::File), + Stderr(tokio::io::Stderr), + FsFile(tokio::fs::File), + TcpStream(tokio::net::TcpStream), + ServerTlsStream(Box>), + ClientTlsStream(Box>), + HttpBody(HttpBody), + ChildStdin(tokio_process::ChildStdin), + ChildStdout(tokio_process::ChildStdout), + ChildStderr(tokio_process::ChildStderr), +} + +impl Resource for CliResource {} + +pub fn lock_resource_table<'a>() -> MutexGuard<'a, ResourceTable> { + RESOURCE_TABLE.lock().unwrap() +} + +/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait +/// but uses an `ErrBox` error instead of `std::io:Error` +pub trait DenoAsyncRead { + fn poll_read(&mut self, buf: &mut [u8]) -> Poll; +} + +impl DenoAsyncRead for CliResource { + fn poll_read(&mut self, buf: &mut [u8]) -> Poll { + let r = match self { + CliResource::FsFile(ref mut f) => f.poll_read(buf), + CliResource::Stdin(ref mut f) => f.poll_read(buf), + CliResource::TcpStream(ref mut f) => f.poll_read(buf), + CliResource::ClientTlsStream(ref mut f) => f.poll_read(buf), + CliResource::ServerTlsStream(ref mut f) => f.poll_read(buf), + CliResource::HttpBody(ref mut f) => f.poll_read(buf), + CliResource::ChildStdout(ref mut f) => f.poll_read(buf), + CliResource::ChildStderr(ref mut f) => f.poll_read(buf), + _ => { + return Err(bad_resource()); + } + }; + + r.map_err(ErrBox::from) + } +} + +/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait +/// but uses an `ErrBox` error instead of `std::io:Error` +pub trait DenoAsyncWrite { + fn poll_write(&mut self, buf: &[u8]) -> Poll; + + fn shutdown(&mut self) -> Poll<(), ErrBox>; +} + +impl DenoAsyncWrite for CliResource { + fn poll_write(&mut self, buf: &[u8]) -> Poll { + let r = match self { + CliResource::FsFile(ref mut f) => f.poll_write(buf), + CliResource::Stdout(ref mut f) => f.poll_write(buf), + CliResource::Stderr(ref mut f) => f.poll_write(buf), + CliResource::TcpStream(ref mut f) => f.poll_write(buf), + CliResource::ClientTlsStream(ref mut f) => f.poll_write(buf), + CliResource::ServerTlsStream(ref mut f) => f.poll_write(buf), + CliResource::ChildStdin(ref mut f) => f.poll_write(buf), + _ => { + return Err(bad_resource()); + } + }; + + r.map_err(ErrBox::from) + } + + fn shutdown(&mut self) -> futures::Poll<(), ErrBox> { + unimplemented!() + } +} + +pub fn add_fs_file(fs_file: tokio::fs::File) -> ResourceId { + let mut table = lock_resource_table(); + table.add("fsFile", Box::new(CliResource::FsFile(fs_file))) +} + +pub fn add_tcp_stream(stream: tokio::net::TcpStream) -> ResourceId { + let mut table = lock_resource_table(); + table.add("tcpStream", Box::new(CliResource::TcpStream(stream))) +} + +pub fn add_tls_stream(stream: ClientTlsStream) -> ResourceId { + let mut table = lock_resource_table(); + table.add( + "clientTlsStream", + Box::new(CliResource::ClientTlsStream(Box::new(stream))), + ) +} + +pub fn add_server_tls_stream(stream: ServerTlsStream) -> ResourceId { + let mut table = lock_resource_table(); + table.add( + "serverTlsStream", + Box::new(CliResource::ServerTlsStream(Box::new(stream))), + ) +} + +pub fn add_reqwest_body(body: ReqwestDecoder) -> ResourceId { + let body = HttpBody::from(body); + let mut table = lock_resource_table(); + table.add("httpBody", Box::new(CliResource::HttpBody(body))) +} + +pub fn add_child_stdin(stdin: tokio_process::ChildStdin) -> ResourceId { + let mut table = lock_resource_table(); + table.add("childStdin", Box::new(CliResource::ChildStdin(stdin))) +} + +pub fn add_child_stdout(stdout: tokio_process::ChildStdout) -> ResourceId { + let mut table = lock_resource_table(); + table.add("childStdout", Box::new(CliResource::ChildStdout(stdout))) +} + +pub fn add_child_stderr(stderr: tokio_process::ChildStderr) -> ResourceId { + let mut table = lock_resource_table(); + table.add("childStderr", Box::new(CliResource::ChildStderr(stderr))) +} + +pub struct CloneFileFuture { + pub rid: ResourceId, +} + +impl Future for CloneFileFuture { + type Item = tokio::fs::File; + type Error = ErrBox; + + fn poll(&mut self) -> Poll { + let mut table = lock_resource_table(); + let repr = table + .get_mut::(self.rid) + .ok_or_else(bad_resource)?; + match repr { + CliResource::FsFile(ref mut file) => { + file.poll_try_clone().map_err(ErrBox::from) + } + _ => Err(bad_resource()), + } + } +} diff --git a/cli/state.rs b/cli/state.rs index a5e9546b0d671f67fd9bde6f4d6699187972f422..edfac72c02e82a2544ccc04dd9acc0b99fdda940 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -5,7 +5,6 @@ use crate::global_timer::GlobalTimer; use crate::import_map::ImportMap; use crate::metrics::Metrics; use crate::ops::JsonOp; -use crate::ops::MinimalOp; use crate::permissions::DenoPermissions; use crate::worker::Worker; use crate::worker::WorkerChannels; @@ -16,7 +15,6 @@ use deno::Loader; use deno::ModuleSpecifier; use deno::Op; use deno::PinnedBuf; -use deno::ResourceTable; use futures::Future; use rand::rngs::StdRng; use rand::SeedableRng; @@ -29,7 +27,6 @@ use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Mutex; -use std::sync::MutexGuard; use std::time::Instant; use tokio::sync::mpsc; @@ -55,7 +52,6 @@ pub struct State { pub start_time: Instant, pub seeded_rng: Option>, pub include_deno_namespace: bool, - pub resource_table: Mutex, } impl Clone for ThreadSafeState { @@ -72,10 +68,6 @@ impl Deref for ThreadSafeState { } impl ThreadSafeState { - pub fn lock_resource_table(&self) -> MutexGuard { - self.resource_table.lock().unwrap() - } - /// Wrap core `OpDispatcher` to collect metrics. pub fn core_op( &self, @@ -111,21 +103,6 @@ impl ThreadSafeState { } } - /// This is a special function that provides `state` argument to dispatcher. - pub fn stateful_minimal_op( - &self, - dispatcher: D, - ) -> impl Fn(i32, Option) -> Box - where - D: Fn(&ThreadSafeState, i32, Option) -> Box, - { - let state = self.clone(); - - move |rid: i32, zero_copy: Option| -> Box { - dispatcher(&state, rid, zero_copy) - } - } - /// This is a special function that provides `state` argument to dispatcher. /// /// NOTE: This only works with JSON dispatcher. @@ -243,7 +220,6 @@ impl ThreadSafeState { start_time: Instant::now(), seeded_rng, include_deno_namespace, - resource_table: Mutex::new(ResourceTable::default()), }; Ok(ThreadSafeState(Arc::new(state))) diff --git a/core/resources.rs b/core/resources.rs index 216f5c8dfb934ea34c8b706887b2a85c45f1f7ba..da4fb6b0781cb17cb606d3a476a547ee236abe80 100644 --- a/core/resources.rs +++ b/core/resources.rs @@ -65,7 +65,7 @@ impl ResourceTable { } // close(2) is done by dropping the value. Therefore we just need to remove - // the resource from the resource table. + // the resource from the RESOURCE_TABLE. pub fn close(&mut self, rid: ResourceId) -> Option<()> { self.map.remove(&rid).map(|(_name, _resource)| ()) }