未验证 提交 aab26d22 编写于 作者: B Bartek Iwańczuk 提交者: GitHub

remove calls to futures::executor::block_on (#4760)

上级 fab0204c
...@@ -455,7 +455,7 @@ impl TsCompiler { ...@@ -455,7 +455,7 @@ impl TsCompiler {
/// ///
/// Along compiled file a special metadata file is saved as well containing /// Along compiled file a special metadata file is saved as well containing
/// hash that can be validated to avoid unnecessary recompilation. /// hash that can be validated to avoid unnecessary recompilation.
async fn cache_compiled_file( fn cache_compiled_file(
&self, &self,
module_specifier: &ModuleSpecifier, module_specifier: &ModuleSpecifier,
contents: &str, contents: &str,
...@@ -468,7 +468,6 @@ impl TsCompiler { ...@@ -468,7 +468,6 @@ impl TsCompiler {
let source_file = self let source_file = self
.file_fetcher .file_fetcher
.fetch_cached_source_file(&module_specifier) .fetch_cached_source_file(&module_specifier)
.await
.expect("Source file not found"); .expect("Source file not found");
let version_hash = source_code_version_hash( let version_hash = source_code_version_hash(
...@@ -528,7 +527,7 @@ impl TsCompiler { ...@@ -528,7 +527,7 @@ impl TsCompiler {
} }
/// This method is called by TS compiler via an "op". /// This method is called by TS compiler via an "op".
pub async fn cache_compiler_output( pub fn cache_compiler_output(
&self, &self,
module_specifier: &ModuleSpecifier, module_specifier: &ModuleSpecifier,
extension: &str, extension: &str,
...@@ -536,7 +535,7 @@ impl TsCompiler { ...@@ -536,7 +535,7 @@ impl TsCompiler {
) -> std::io::Result<()> { ) -> std::io::Result<()> {
match extension { match extension {
".map" => self.cache_source_map(module_specifier, contents), ".map" => self.cache_source_map(module_specifier, contents),
".js" => self.cache_compiled_file(module_specifier, contents).await, ".js" => self.cache_compiled_file(module_specifier, contents),
_ => unreachable!(), _ => unreachable!(),
} }
} }
...@@ -578,10 +577,9 @@ impl TsCompiler { ...@@ -578,10 +577,9 @@ impl TsCompiler {
script_name: &str, script_name: &str,
) -> Option<SourceFile> { ) -> Option<SourceFile> {
if let Some(module_specifier) = self.try_to_resolve(script_name) { if let Some(module_specifier) = self.try_to_resolve(script_name) {
let fut = self return self
.file_fetcher .file_fetcher
.fetch_cached_source_file(&module_specifier); .fetch_cached_source_file(&module_specifier);
return futures::executor::block_on(fut);
} }
None None
......
...@@ -105,10 +105,8 @@ impl SourceFileFetcher { ...@@ -105,10 +105,8 @@ impl SourceFileFetcher {
Ok(()) Ok(())
} }
// TODO(bartlomieju): fetching cached resources should be done
// using blocking fs syscalls
/// Required for TS compiler and source maps. /// Required for TS compiler and source maps.
pub async fn fetch_cached_source_file( pub fn fetch_cached_source_file(
&self, &self,
specifier: &ModuleSpecifier, specifier: &ModuleSpecifier,
) -> Option<SourceFile> { ) -> Option<SourceFile> {
...@@ -124,10 +122,13 @@ impl SourceFileFetcher { ...@@ -124,10 +122,13 @@ impl SourceFileFetcher {
// It should be safe to for caller block on this // It should be safe to for caller block on this
// future, because it doesn't actually do any asynchronous // future, because it doesn't actually do any asynchronous
// action in that path. // action in that path.
self if let Ok(maybe_source_file) =
.get_source_file(specifier.as_url(), true, false, true) self.get_source_file_from_local_cache(specifier.as_url())
.await {
.ok() return maybe_source_file;
}
None
} }
/// Save a given source file into cache. /// Save a given source file into cache.
...@@ -218,6 +219,22 @@ impl SourceFileFetcher { ...@@ -218,6 +219,22 @@ impl SourceFileFetcher {
} }
} }
fn get_source_file_from_local_cache(
&self,
module_url: &Url,
) -> Result<Option<SourceFile>, ErrBox> {
let url_scheme = module_url.scheme();
let is_local_file = url_scheme == "file";
SourceFileFetcher::check_if_supported_scheme(&module_url)?;
// Local files are always fetched from disk bypassing cache entirely.
if is_local_file {
return self.fetch_local_file(&module_url).map(Some);
}
self.fetch_cached_remote_source(&module_url)
}
/// This is main method that is responsible for fetching local or remote files. /// This is main method that is responsible for fetching local or remote files.
/// ///
/// If this is a remote module, and it has not yet been cached, the resulting /// If this is a remote module, and it has not yet been cached, the resulting
......
...@@ -44,13 +44,12 @@ fn op_cache( ...@@ -44,13 +44,12 @@ fn op_cache(
let state_ = &state.borrow().global_state; let state_ = &state.borrow().global_state;
let ts_compiler = state_.ts_compiler.clone(); let ts_compiler = state_.ts_compiler.clone();
let fut = ts_compiler.cache_compiler_output( ts_compiler.cache_compiler_output(
&module_specifier, &module_specifier,
&args.extension, &args.extension,
&args.contents, &args.contents,
); )?;
futures::executor::block_on(fut)?;
Ok(JsonOp::Sync(json!({}))) Ok(JsonOp::Sync(json!({})))
} }
......
...@@ -78,7 +78,7 @@ pub fn op_fs_events_open( ...@@ -78,7 +78,7 @@ pub fn op_fs_events_open(
let mut sender = sender.lock().unwrap(); let mut sender = sender.lock().unwrap();
// Ignore result, if send failed it means that watcher was already closed, // Ignore result, if send failed it means that watcher was already closed,
// but not all messages have been flushed. // but not all messages have been flushed.
let _ = futures::executor::block_on(sender.send(res2)); let _ = sender.try_send(res2);
}) })
.map_err(ErrBox::from)?; .map_err(ErrBox::from)?;
let recursive_mode = if args.recursive { let recursive_mode = if args.recursive {
......
...@@ -215,7 +215,7 @@ fn op_send( ...@@ -215,7 +215,7 @@ fn op_send(
OpError::bad_resource("Socket has been closed".to_string()) OpError::bad_resource("Socket has been closed".to_string())
})?; })?;
let socket = &mut resource.socket; let socket = &mut resource.socket;
let addr = resolve_addr(&args.hostname, args.port).await?; let addr = resolve_addr(&args.hostname, args.port)?;
socket.send_to(&buf, addr).await?; socket.send_to(&buf, addr).await?;
Ok(json!({})) Ok(json!({}))
}; };
...@@ -273,7 +273,7 @@ fn op_connect( ...@@ -273,7 +273,7 @@ fn op_connect(
let state_ = state.clone(); let state_ = state.clone();
state.check_net(&args.hostname, args.port)?; state.check_net(&args.hostname, args.port)?;
let op = async move { let op = async move {
let addr = resolve_addr(&args.hostname, args.port).await?; let addr = resolve_addr(&args.hostname, args.port)?;
let tcp_stream = TcpStream::connect(&addr).await?; let tcp_stream = TcpStream::connect(&addr).await?;
let local_addr = tcp_stream.local_addr()?; let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?; let remote_addr = tcp_stream.peer_addr()?;
...@@ -500,8 +500,7 @@ fn op_listen( ...@@ -500,8 +500,7 @@ fn op_listen(
transport_args: ArgsEnum::Ip(args), transport_args: ArgsEnum::Ip(args),
} => { } => {
state.check_net(&args.hostname, args.port)?; state.check_net(&args.hostname, args.port)?;
let addr = let addr = resolve_addr(&args.hostname, args.port)?;
futures::executor::block_on(resolve_addr(&args.hostname, args.port))?;
let (rid, local_addr) = if transport == "tcp" { let (rid, local_addr) = if transport == "tcp" {
listen_tcp(state, addr)? listen_tcp(state, addr)?
} else { } else {
......
...@@ -61,7 +61,7 @@ pub fn op_connect_tls( ...@@ -61,7 +61,7 @@ pub fn op_connect_tls(
} }
let op = async move { let op = async move {
let addr = resolve_addr(&args.hostname, args.port).await?; let addr = resolve_addr(&args.hostname, args.port)?;
let tcp_stream = TcpStream::connect(&addr).await?; let tcp_stream = TcpStream::connect(&addr).await?;
let local_addr = tcp_stream.local_addr()?; let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?; let remote_addr = tcp_stream.peer_addr()?;
...@@ -237,8 +237,7 @@ fn op_listen_tls( ...@@ -237,8 +237,7 @@ fn op_listen_tls(
.set_single_cert(load_certs(&cert_file)?, load_keys(&key_file)?.remove(0)) .set_single_cert(load_certs(&cert_file)?, load_keys(&key_file)?.remove(0))
.expect("invalid key or certificate"); .expect("invalid key or certificate");
let tls_acceptor = TlsAcceptor::from(Arc::new(config)); let tls_acceptor = TlsAcceptor::from(Arc::new(config));
let addr = let addr = resolve_addr(&args.hostname, args.port)?;
futures::executor::block_on(resolve_addr(&args.hostname, args.port))?;
let listener = futures::executor::block_on(TcpListener::bind(&addr))?; let listener = futures::executor::block_on(TcpListener::bind(&addr))?;
let local_addr = listener.local_addr()?; let local_addr = listener.local_addr()?;
let tls_listener_resource = TlsListenerResource { let tls_listener_resource = TlsListenerResource {
......
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use crate::op_error::OpError; use crate::op_error::OpError;
use std::future::Future;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
/// Resolve network address. Returns a future. /// Resolve network address. Returns a future.
pub fn resolve_addr(hostname: &str, port: u16) -> ResolveAddrFuture { pub fn resolve_addr(hostname: &str, port: u16) -> Result<SocketAddr, OpError> {
ResolveAddrFuture { // Default to localhost if given just the port. Example: ":80"
hostname: hostname.to_string(), let addr: &str = if !hostname.is_empty() {
port, &hostname
} } else {
} "0.0.0.0"
};
pub struct ResolveAddrFuture {
hostname: String,
port: u16,
}
impl Future for ResolveAddrFuture {
type Output = Result<SocketAddr, OpError>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> { // If this looks like an ipv6 IP address. Example: "[2001:db8::1]"
let inner = self.get_mut(); // Then we remove the brackets.
// The implementation of this is not actually async at the moment, let addr = if addr.starts_with('[') && addr.ends_with(']') {
// however we intend to use async DNS resolution in the future and let l = addr.len() - 1;
// so we expose this as a future instead of Result. addr.get(1..l).unwrap()
} else {
// Default to localhost if given just the port. Example: ":80" addr
let addr: &str = if !inner.hostname.is_empty() { };
&inner.hostname let addr_port_pair = (addr, port);
} else { let mut iter = addr_port_pair.to_socket_addrs().map_err(OpError::from)?;
"0.0.0.0" Ok(iter.next().unwrap())
};
// If this looks like an ipv6 IP address. Example: "[2001:db8::1]"
// Then we remove the brackets.
let addr = if addr.starts_with('[') && addr.ends_with(']') {
let l = addr.len() - 1;
addr.get(1..l).unwrap()
} else {
addr
};
let addr_port_pair = (addr, inner.port);
let r = addr_port_pair.to_socket_addrs().map_err(OpError::from);
Poll::Ready(r.and_then(|mut iter| match iter.next() {
Some(a) => Ok(a),
None => panic!("There should be at least one result"),
}))
}
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use futures::executor::block_on;
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
use std::net::Ipv6Addr; use std::net::Ipv6Addr;
use std::net::SocketAddrV4; use std::net::SocketAddrV4;
...@@ -67,7 +37,7 @@ mod tests { ...@@ -67,7 +37,7 @@ mod tests {
fn resolve_addr1() { fn resolve_addr1() {
let expected = let expected =
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 80)); SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 80));
let actual = block_on(resolve_addr("127.0.0.1", 80)).unwrap(); let actual = resolve_addr("127.0.0.1", 80).unwrap();
assert_eq!(actual, expected); assert_eq!(actual, expected);
} }
...@@ -75,7 +45,7 @@ mod tests { ...@@ -75,7 +45,7 @@ mod tests {
fn resolve_addr2() { fn resolve_addr2() {
let expected = let expected =
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 80)); SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 80));
let actual = block_on(resolve_addr("", 80)).unwrap(); let actual = resolve_addr("", 80).unwrap();
assert_eq!(actual, expected); assert_eq!(actual, expected);
} }
...@@ -83,7 +53,7 @@ mod tests { ...@@ -83,7 +53,7 @@ mod tests {
fn resolve_addr3() { fn resolve_addr3() {
let expected = let expected =
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 0, 2, 1), 25)); SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 0, 2, 1), 25));
let actual = block_on(resolve_addr("192.0.2.1", 25)).unwrap(); let actual = resolve_addr("192.0.2.1", 25).unwrap();
assert_eq!(actual, expected); assert_eq!(actual, expected);
} }
...@@ -95,7 +65,7 @@ mod tests { ...@@ -95,7 +65,7 @@ mod tests {
0, 0,
0, 0,
)); ));
let actual = block_on(resolve_addr("[2001:db8::1]", 8080)).unwrap(); let actual = resolve_addr("[2001:db8::1]", 8080).unwrap();
assert_eq!(actual, expected); assert_eq!(actual, expected);
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册