未验证 提交 742a16b5 编写于 作者: B Bartek Iwańczuk 提交者: GitHub

refactor: rewrite file_fetcher to use async fns, lift blocking call (#4037)

上级 90125566
...@@ -451,7 +451,7 @@ impl TsCompiler { ...@@ -451,7 +451,7 @@ impl TsCompiler {
/// ///
/// Along compiled file a special metadata file is saved as well containing /// Along compiled file a special metadata file is saved as well containing
/// hash that can be validated to avoid unnecessary recompilation. /// hash that can be validated to avoid unnecessary recompilation.
fn cache_compiled_file( async fn cache_compiled_file(
&self, &self,
module_specifier: &ModuleSpecifier, module_specifier: &ModuleSpecifier,
contents: &str, contents: &str,
...@@ -459,35 +459,31 @@ impl TsCompiler { ...@@ -459,35 +459,31 @@ impl TsCompiler {
let js_key = self let js_key = self
.disk_cache .disk_cache
.get_cache_filename_with_extension(module_specifier.as_url(), "js"); .get_cache_filename_with_extension(module_specifier.as_url(), "js");
self self.disk_cache.set(&js_key, contents.as_bytes())?;
.disk_cache self.mark_compiled(module_specifier.as_url());
.set(&js_key, contents.as_bytes()) let source_file = self
.and_then(|_| { .file_fetcher
self.mark_compiled(module_specifier.as_url()); .fetch_cached_source_file(&module_specifier)
.await
let source_file = self .expect("Source file not found");
.file_fetcher
.fetch_cached_source_file(&module_specifier) let version_hash = source_code_version_hash(
.expect("Source file not found"); &source_file.source_code,
version::DENO,
let version_hash = source_code_version_hash( &self.config.hash,
&source_file.source_code, );
version::DENO,
&self.config.hash,
);
let compiled_file_metadata = CompiledFileMetadata { let compiled_file_metadata = CompiledFileMetadata {
source_path: source_file.filename, source_path: source_file.filename,
version_hash, version_hash,
}; };
let meta_key = self let meta_key = self
.disk_cache .disk_cache
.get_cache_filename_with_extension(module_specifier.as_url(), "meta"); .get_cache_filename_with_extension(module_specifier.as_url(), "meta");
self.disk_cache.set( self.disk_cache.set(
&meta_key, &meta_key,
compiled_file_metadata.to_json_string()?.as_bytes(), compiled_file_metadata.to_json_string()?.as_bytes(),
) )
})
} }
/// Return associated source map file for given TS module. /// Return associated source map file for given TS module.
...@@ -528,7 +524,7 @@ impl TsCompiler { ...@@ -528,7 +524,7 @@ impl TsCompiler {
} }
/// This method is called by TS compiler via an "op". /// This method is called by TS compiler via an "op".
pub fn cache_compiler_output( pub async fn cache_compiler_output(
&self, &self,
module_specifier: &ModuleSpecifier, module_specifier: &ModuleSpecifier,
extension: &str, extension: &str,
...@@ -536,7 +532,7 @@ impl TsCompiler { ...@@ -536,7 +532,7 @@ impl TsCompiler {
) -> std::io::Result<()> { ) -> std::io::Result<()> {
match extension { match extension {
".map" => self.cache_source_map(module_specifier, contents), ".map" => self.cache_source_map(module_specifier, contents),
".js" => self.cache_compiled_file(module_specifier, contents), ".js" => self.cache_compiled_file(module_specifier, contents).await,
_ => unreachable!(), _ => unreachable!(),
} }
} }
...@@ -576,9 +572,10 @@ impl TsCompiler { ...@@ -576,9 +572,10 @@ impl TsCompiler {
script_name: &str, script_name: &str,
) -> Option<SourceFile> { ) -> Option<SourceFile> {
if let Some(module_specifier) = self.try_to_resolve(script_name) { if let Some(module_specifier) = self.try_to_resolve(script_name) {
return self let fut = self
.file_fetcher .file_fetcher
.fetch_cached_source_file(&module_specifier); .fetch_cached_source_file(&module_specifier);
return futures::executor::block_on(fut);
} }
None None
......
...@@ -10,7 +10,6 @@ use crate::http_util::FetchOnceResult; ...@@ -10,7 +10,6 @@ use crate::http_util::FetchOnceResult;
use crate::msg; use crate::msg;
use deno_core::ErrBox; use deno_core::ErrBox;
use deno_core::ModuleSpecifier; use deno_core::ModuleSpecifier;
use futures::future::Either;
use futures::future::FutureExt; use futures::future::FutureExt;
use regex::Regex; use regex::Regex;
use reqwest; use reqwest;
...@@ -42,9 +41,6 @@ pub struct SourceFile { ...@@ -42,9 +41,6 @@ pub struct SourceFile {
pub source_code: Vec<u8>, pub source_code: Vec<u8>,
} }
pub type SourceFileFuture =
dyn Future<Output = Result<SourceFile, ErrBox>> + Send;
/// Simple struct implementing in-process caching to prevent multiple /// Simple struct implementing in-process caching to prevent multiple
/// fs reads/net fetches for same file. /// fs reads/net fetches for same file.
#[derive(Clone, Default)] #[derive(Clone, Default)]
...@@ -114,8 +110,10 @@ impl SourceFileFetcher { ...@@ -114,8 +110,10 @@ impl SourceFileFetcher {
Ok(()) Ok(())
} }
// TODO(bartlomieju): fetching cached resources should be done
// using blocking fs syscalls
/// Required for TS compiler and source maps. /// Required for TS compiler and source maps.
pub fn fetch_cached_source_file( pub async fn fetch_cached_source_file(
&self, &self,
specifier: &ModuleSpecifier, specifier: &ModuleSpecifier,
) -> Option<SourceFile> { ) -> Option<SourceFile> {
...@@ -127,79 +125,81 @@ impl SourceFileFetcher { ...@@ -127,79 +125,81 @@ impl SourceFileFetcher {
// If file is not in memory cache check if it can be found // If file is not in memory cache check if it can be found
// in local cache - which effectively means trying to fetch // in local cache - which effectively means trying to fetch
// using "--cached-only" flag. We can safely block on this // using "--cached-only" flag.
// future, because it doesn't do any asynchronous action // It should be safe to for caller block on this
// it that path. // future, because it doesn't actually do any asynchronous
let fut = self.get_source_file_async(specifier.as_url(), true, false, true); // action in that path.
self
futures::executor::block_on(fut).ok() .get_source_file_async(specifier.as_url(), true, false, true)
.await
.ok()
} }
pub fn fetch_source_file_async( pub async fn fetch_source_file_async(
&self, &self,
specifier: &ModuleSpecifier, specifier: &ModuleSpecifier,
maybe_referrer: Option<ModuleSpecifier>, maybe_referrer: Option<ModuleSpecifier>,
) -> Pin<Box<SourceFileFuture>> { ) -> Result<SourceFile, ErrBox> {
let module_url = specifier.as_url().to_owned(); let module_url = specifier.as_url().to_owned();
debug!("fetch_source_file_async specifier: {} ", &module_url); debug!("fetch_source_file_async specifier: {} ", &module_url);
// Check if this file was already fetched and can be retrieved from in-process cache. // Check if this file was already fetched and can be retrieved from in-process cache.
if let Some(source_file) = self.source_file_cache.get(specifier.to_string()) let maybe_cached_file = self.source_file_cache.get(specifier.to_string());
{ if let Some(source_file) = maybe_cached_file {
return Box::pin(async { Ok(source_file) }); return Ok(source_file);
} }
let source_file_cache = self.source_file_cache.clone(); let source_file_cache = self.source_file_cache.clone();
let specifier_ = specifier.clone(); let specifier_ = specifier.clone();
let source_file = self.get_source_file_async( let result = self
&module_url, .get_source_file_async(
self.use_disk_cache, &module_url,
self.no_remote, self.use_disk_cache,
self.cached_only, self.no_remote,
); self.cached_only,
)
.await;
Box::pin(async move { match result {
match source_file.await { Ok(mut file) => {
Ok(mut file) => { // TODO: move somewhere?
// TODO: move somewhere? if file.source_code.starts_with(b"#!") {
if file.source_code.starts_with(b"#!") { file.source_code = filter_shebang(file.source_code);
file.source_code = filter_shebang(file.source_code); }
}
// Cache in-process for subsequent access. // Cache in-process for subsequent access.
source_file_cache.set(specifier_.to_string(), file.clone()); source_file_cache.set(specifier_.to_string(), file.clone());
Ok(file) Ok(file)
}
Err(err) => {
let err_kind = err.kind();
let referrer_suffix = if let Some(referrer) = maybe_referrer {
format!(r#" from "{}""#, referrer)
} else {
"".to_owned()
};
// Hack: Check error message for "--cached-only" because the kind
// conflicts with other errors.
let err = if err.to_string().contains("--cached-only") {
let msg = format!(
r#"Cannot find module "{}"{} in cache, --cached-only is specified"#,
module_url, referrer_suffix
);
DenoError::new(ErrorKind::NotFound, msg).into()
} else if err_kind == ErrorKind::NotFound {
let msg = format!(
r#"Cannot resolve module "{}"{}"#,
module_url, referrer_suffix
);
DenoError::new(ErrorKind::NotFound, msg).into()
} else {
err
};
Err(err)
}
} }
}) Err(err) => {
let err_kind = err.kind();
let referrer_suffix = if let Some(referrer) = maybe_referrer {
format!(r#" from "{}""#, referrer)
} else {
"".to_owned()
};
// Hack: Check error message for "--cached-only" because the kind
// conflicts with other errors.
let err = if err.to_string().contains("--cached-only") {
let msg = format!(
r#"Cannot find module "{}"{} in cache, --cached-only is specified"#,
module_url, referrer_suffix
);
DenoError::new(ErrorKind::NotFound, msg).into()
} else if err_kind == ErrorKind::NotFound {
let msg = format!(
r#"Cannot resolve module "{}"{}"#,
module_url, referrer_suffix
);
DenoError::new(ErrorKind::NotFound, msg).into()
} else {
err
};
Err(err)
}
}
} }
/// This is main method that is responsible for fetching local or remote files. /// This is main method that is responsible for fetching local or remote files.
...@@ -213,54 +213,38 @@ impl SourceFileFetcher { ...@@ -213,54 +213,38 @@ impl SourceFileFetcher {
/// ///
/// If `cached_only` is true then this method will fail for remote files /// If `cached_only` is true then this method will fail for remote files
/// not already cached. /// not already cached.
fn get_source_file_async( async fn get_source_file_async(
&self, &self,
module_url: &Url, module_url: &Url,
use_disk_cache: bool, use_disk_cache: bool,
no_remote: bool, no_remote: bool,
cached_only: bool, cached_only: bool,
) -> impl Future<Output = Result<SourceFile, ErrBox>> { ) -> Result<SourceFile, ErrBox> {
let url_scheme = module_url.scheme(); let url_scheme = module_url.scheme();
let is_local_file = url_scheme == "file"; let is_local_file = url_scheme == "file";
SourceFileFetcher::check_if_supported_scheme(&module_url)?;
if let Err(err) = SourceFileFetcher::check_if_supported_scheme(&module_url)
{
return Either::Left(futures::future::err(err));
}
// Local files are always fetched from disk bypassing cache entirely. // Local files are always fetched from disk bypassing cache entirely.
if is_local_file { if is_local_file {
match self.fetch_local_file(&module_url) { return self.fetch_local_file(&module_url);
Ok(source_file) => {
return Either::Left(futures::future::ok(source_file));
}
Err(err) => {
return Either::Left(futures::future::err(err));
}
}
} }
// The file is remote, fail if `no_remote` is true. // The file is remote, fail if `no_remote` is true.
if no_remote { if no_remote {
return Either::Left(futures::future::err( let e = std::io::Error::new(
std::io::Error::new( std::io::ErrorKind::NotFound,
std::io::ErrorKind::NotFound, format!(
format!( "Not allowed to get remote file '{}'",
"Not allowed to get remote file '{}'", module_url.to_string()
module_url.to_string() ),
), );
) return Err(e.into());
.into(),
));
} }
// Fetch remote file and cache on-disk for subsequent access // Fetch remote file and cache on-disk for subsequent access
Either::Right(self.fetch_remote_source_async( self
&module_url, .fetch_remote_source_async(&module_url, use_disk_cache, cached_only, 10)
use_disk_cache, .await
cached_only,
10,
))
} }
/// Fetch local source file. /// Fetch local source file.
...@@ -355,16 +339,19 @@ impl SourceFileFetcher { ...@@ -355,16 +339,19 @@ impl SourceFileFetcher {
} }
/// Asynchronously fetch remote source file specified by the URL following redirects. /// Asynchronously fetch remote source file specified by the URL following redirects.
///
/// Note that this is a recursive method so it can't be "async", but rather return
/// Pin<Box<..>>.
fn fetch_remote_source_async( fn fetch_remote_source_async(
&self, &self,
module_url: &Url, module_url: &Url,
use_disk_cache: bool, use_disk_cache: bool,
cached_only: bool, cached_only: bool,
redirect_limit: i64, redirect_limit: i64,
) -> Pin<Box<SourceFileFuture>> { ) -> Pin<Box<dyn Future<Output = Result<SourceFile, ErrBox>>>> {
if redirect_limit < 0 { if redirect_limit < 0 {
let e = DenoError::new(ErrorKind::Http, "too many redirects".to_string()); let e = DenoError::new(ErrorKind::Http, "too many redirects".to_string());
return futures::future::err(e.into()).boxed(); return futures::future::err(e.into()).boxed_local();
} }
let is_blacklisted = let is_blacklisted =
...@@ -373,13 +360,13 @@ impl SourceFileFetcher { ...@@ -373,13 +360,13 @@ impl SourceFileFetcher {
if use_disk_cache && !is_blacklisted { if use_disk_cache && !is_blacklisted {
match self.fetch_cached_remote_source(&module_url) { match self.fetch_cached_remote_source(&module_url) {
Ok(Some(source_file)) => { Ok(Some(source_file)) => {
return futures::future::ok(source_file).boxed(); return futures::future::ok(source_file).boxed_local();
} }
Ok(None) => { Ok(None) => {
// there's no cached version // there's no cached version
} }
Err(err) => { Err(err) => {
return futures::future::err(err).boxed(); return futures::future::err(err).boxed_local();
} }
} }
} }
...@@ -397,7 +384,7 @@ impl SourceFileFetcher { ...@@ -397,7 +384,7 @@ impl SourceFileFetcher {
) )
.into(), .into(),
) )
.boxed(); .boxed_local();
} }
eprintln!( eprintln!(
...@@ -470,7 +457,7 @@ impl SourceFileFetcher { ...@@ -470,7 +457,7 @@ impl SourceFileFetcher {
} }
}; };
f.boxed() f.boxed_local()
} }
} }
......
...@@ -8,6 +8,7 @@ use crate::ops::json_op; ...@@ -8,6 +8,7 @@ use crate::ops::json_op;
use crate::state::State; use crate::state::State;
use deno_core::Loader; use deno_core::Loader;
use deno_core::*; use deno_core::*;
use futures::future::FutureExt;
pub fn init(i: &mut Isolate, s: &State) { pub fn init(i: &mut Isolate, s: &State) {
i.register_op("cache", s.core_op(json_op(s.stateful_op(op_cache)))); i.register_op("cache", s.core_op(json_op(s.stateful_op(op_cache))));
...@@ -43,16 +44,15 @@ fn op_cache( ...@@ -43,16 +44,15 @@ fn op_cache(
let module_specifier = ModuleSpecifier::resolve_url(&args.module_id) let module_specifier = ModuleSpecifier::resolve_url(&args.module_id)
.expect("Should be valid module specifier"); .expect("Should be valid module specifier");
state let state_ = &state.borrow().global_state;
.borrow() let ts_compiler = state_.ts_compiler.clone();
.global_state let fut = ts_compiler.cache_compiler_output(
.ts_compiler &module_specifier,
.cache_compiler_output( &args.extension,
&module_specifier, &args.contents,
&args.extension, );
&args.contents,
)?;
futures::executor::block_on(fut)?;
Ok(JsonOp::Sync(json!({}))) Ok(JsonOp::Sync(json!({})))
} }
...@@ -102,21 +102,27 @@ fn op_fetch_source_files( ...@@ -102,21 +102,27 @@ fn op_fetch_source_files(
None None
}; };
let mut futures = vec![];
let global_state = state.borrow().global_state.clone(); let global_state = state.borrow().global_state.clone();
let file_fetcher = global_state.file_fetcher.clone();
for specifier in &args.specifiers { let specifiers = args.specifiers.clone();
let resolved_specifier = let future = async move {
ModuleSpecifier::resolve_url(&specifier).expect("Invalid specifier"); let file_futures: Vec<_> = specifiers
let fut = global_state .into_iter()
.file_fetcher .map(|specifier| {
.fetch_source_file_async(&resolved_specifier, ref_specifier.clone()); let file_fetcher_ = file_fetcher.clone();
futures.push(fut); let ref_specifier_ = ref_specifier.clone();
} async move {
let resolved_specifier = ModuleSpecifier::resolve_url(&specifier)
let future = Box::pin(async move { .expect("Invalid specifier");
let files = try_join_all(futures).await?; file_fetcher_
.fetch_source_file_async(&resolved_specifier, ref_specifier_)
.await
}
.boxed_local()
})
.collect();
let files = try_join_all(file_futures).await?;
// We want to get an array of futures that resolves to // We want to get an array of futures that resolves to
let v = files.into_iter().map(|f| { let v = files.into_iter().map(|f| {
async { async {
...@@ -156,7 +162,8 @@ fn op_fetch_source_files( ...@@ -156,7 +162,8 @@ fn op_fetch_source_files(
let v = try_join_all(v).await?; let v = try_join_all(v).await?;
Ok(v.into()) Ok(v.into())
}); }
.boxed_local();
Ok(JsonOp::Async(future)) Ok(JsonOp::Async(future))
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册