worker.rs 10.1 KB
Newer Older
R
Ryan Dahl 已提交
1
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
2
use crate::fmt_errors::JSError;
3
use crate::state::ThreadSafeState;
4
use crate::tokio_util;
5
use deno;
6
use deno::ErrBox;
7
use deno::ModuleSpecifier;
B
Bert Belder 已提交
8
use deno::RecursiveLoad;
R
Ryan Dahl 已提交
9
use deno::StartupData;
10
use futures::Async;
R
Ryan Dahl 已提交
11
use futures::Future;
12
use std::env;
13 14
use std::sync::Arc;
use std::sync::Mutex;
15
use url::Url;
B
Bartek Iwańczuk 已提交
16

17
/// Wraps deno::Isolate to provide source maps, ops for the CLI, and
18
/// high-level module loading
19
#[derive(Clone)]
20
pub struct Worker {
21
  isolate: Arc<Mutex<deno::Isolate>>,
22
  pub state: ThreadSafeState,
23 24
}

25 26 27 28
impl Worker {
  pub fn new(
    _name: String,
    startup_data: StartupData,
29
    state: ThreadSafeState,
30
  ) -> Worker {
31 32 33
    let isolate = Arc::new(Mutex::new(deno::Isolate::new(startup_data, false)));
    {
      let mut i = isolate.lock().unwrap();
B
Bert Belder 已提交
34

35
      let state_ = state.clone();
R
Ryan Dahl 已提交
36 37
      i.set_dispatch(move |op_id, control_buf, zero_copy_buf| {
        state_.dispatch(op_id, control_buf, zero_copy_buf)
38
      });
B
Bert Belder 已提交
39 40 41 42 43 44 45 46 47 48 49 50 51

      let state_ = state.clone();
      i.set_dyn_import(move |id, specifier, referrer| {
        let load_stream = RecursiveLoad::dynamic_import(
          id,
          specifier,
          referrer,
          state_.clone(),
          state_.modules.clone(),
        );
        Box::new(load_stream)
      });

52 53
      let state_ = state.clone();
      i.set_js_error_create(move |v8_exception| {
B
Bartek Iwańczuk 已提交
54
        JSError::from_v8_exception(v8_exception, &state_.ts_compiler)
55
      })
56
    }
57
    Self { isolate, state }
58 59
  }

60
  /// Same as execute2() but the filename defaults to "$CWD/__anonymous__".
61
  pub fn execute(&mut self, js_source: &str) -> Result<(), ErrBox> {
62 63 64
    let path = env::current_dir().unwrap().join("__anonymous__");
    let url = Url::from_file_path(path).unwrap();
    self.execute2(url.as_str(), js_source)
65 66 67 68 69
  }

  /// Executes the provided JavaScript source code. The js_filename argument is
  /// provided only for debugging purposes.
  pub fn execute2(
70
    &mut self,
71 72
    js_filename: &str,
    js_source: &str,
73
  ) -> Result<(), ErrBox> {
74
    let mut isolate = self.isolate.lock().unwrap();
75
    isolate.execute(js_filename, js_source)
76 77
  }

78
  /// Executes the provided JavaScript module.
79
  pub fn execute_mod_async(
80
    &mut self,
81
    module_specifier: &ModuleSpecifier,
82
    is_prefetch: bool,
83
  ) -> impl Future<Item = (), Error = ErrBox> {
84 85
    let worker = self.clone();
    let loader = self.state.clone();
86
    let isolate = self.isolate.clone();
87
    let modules = self.state.modules.clone();
B
Bert Belder 已提交
88 89 90
    let recursive_load =
      RecursiveLoad::main(&module_specifier.to_string(), loader, modules)
        .get_future(isolate);
91 92 93 94 95 96 97 98 99
    recursive_load.and_then(move |id| -> Result<(), ErrBox> {
      worker.state.progress.done();
      if is_prefetch {
        Ok(())
      } else {
        let mut isolate = worker.isolate.lock().unwrap();
        isolate.mod_evaluate(id)
      }
    })
100 101
  }

102
  /// Executes the provided JavaScript module.
103
  pub fn execute_mod(
104
    &mut self,
105
    module_specifier: &ModuleSpecifier,
106
    is_prefetch: bool,
107
  ) -> Result<(), ErrBox> {
108
    tokio_util::block_on(self.execute_mod_async(module_specifier, is_prefetch))
109
  }
110
}
111

112
impl Future for Worker {
113
  type Item = ();
114
  type Error = ErrBox;
115

116
  fn poll(&mut self) -> Result<Async<()>, ErrBox> {
117
    let mut isolate = self.isolate.lock().unwrap();
118
    isolate.poll()
119 120
  }
}
121

R
Ryan Dahl 已提交
122 123 124
#[cfg(test)]
mod tests {
  use super::*;
125
  use crate::flags;
A
andy finch 已提交
126
  use crate::ops::op_selector_std;
R
Ryan Dahl 已提交
127
  use crate::progress::Progress;
128 129
  use crate::resources;
  use crate::startup_data;
130
  use crate::state::ThreadSafeState;
131
  use crate::tokio_util;
132 133
  use futures::future::lazy;
  use std::sync::atomic::Ordering;
134 135

  #[test]
136
  fn execute_mod_esm_imports_a() {
137
    let module_specifier =
138
      ModuleSpecifier::resolve_url_or_path("tests/esm_imports_a.js").unwrap();
139
    let argv = vec![String::from("./deno"), module_specifier.to_string()];
R
Ryan Dahl 已提交
140 141 142 143 144
    let state = ThreadSafeState::new(
      flags::DenoFlags::default(),
      argv,
      op_selector_std,
      Progress::new(),
145
      true,
146 147
    )
    .unwrap();
148 149
    let state_ = state.clone();
    tokio_util::run(lazy(move || {
150 151
      let mut worker =
        Worker::new("TEST".to_string(), StartupData::None, state);
152
      let result = worker.execute_mod(&module_specifier, false);
153 154 155
      if let Err(err) = result {
        eprintln!("execute_mod err {:?}", err);
      }
156
      tokio_util::panic_on_error(worker)
157 158 159
    }));

    let metrics = &state_.metrics;
160
    assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 2);
R
Ryan Dahl 已提交
161 162
    // Check that we didn't start the compiler.
    assert_eq!(metrics.compiler_starts.load(Ordering::SeqCst), 0);
163 164 165 166
  }

  #[test]
  fn execute_mod_circular() {
167
    let module_specifier =
168
      ModuleSpecifier::resolve_url_or_path("tests/circular1.js").unwrap();
169
    let argv = vec![String::from("./deno"), module_specifier.to_string()];
R
Ryan Dahl 已提交
170 171 172 173 174
    let state = ThreadSafeState::new(
      flags::DenoFlags::default(),
      argv,
      op_selector_std,
      Progress::new(),
175
      true,
176 177
    )
    .unwrap();
178 179
    let state_ = state.clone();
    tokio_util::run(lazy(move || {
180 181
      let mut worker =
        Worker::new("TEST".to_string(), StartupData::None, state);
182
      let result = worker.execute_mod(&module_specifier, false);
183 184 185
      if let Err(err) = result {
        eprintln!("execute_mod err {:?}", err);
      }
186
      tokio_util::panic_on_error(worker)
187 188 189
    }));

    let metrics = &state_.metrics;
190
    assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 2);
R
Ryan Dahl 已提交
191 192
    // Check that we didn't start the compiler.
    assert_eq!(metrics.compiler_starts.load(Ordering::SeqCst), 0);
193
  }
194

R
Ryan Dahl 已提交
195 196
  #[test]
  fn execute_006_url_imports() {
197
    let module_specifier =
198
      ModuleSpecifier::resolve_url_or_path("tests/006_url_imports.ts").unwrap();
199
    let argv = vec![String::from("deno"), module_specifier.to_string()];
R
Ryan Dahl 已提交
200 201 202
    let mut flags = flags::DenoFlags::default();
    flags.reload = true;
    let state =
203
      ThreadSafeState::new(flags, argv, op_selector_std, Progress::new(), true)
204
        .unwrap();
R
Ryan Dahl 已提交
205 206 207 208 209 210 211
    let state_ = state.clone();
    tokio_util::run(lazy(move || {
      let mut worker = Worker::new(
        "TEST".to_string(),
        startup_data::deno_isolate_init(),
        state,
      );
212
      worker.execute("denoMain()").unwrap();
213
      let result = worker.execute_mod(&module_specifier, false);
214 215 216
      if let Err(err) = result {
        eprintln!("execute_mod err {:?}", err);
      }
R
Ryan Dahl 已提交
217 218 219 220 221
      tokio_util::panic_on_error(worker)
    }));

    let metrics = &state_.metrics;
    assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 3);
R
Ryan Dahl 已提交
222 223
    // Check that we've only invoked the compiler once.
    assert_eq!(metrics.compiler_starts.load(Ordering::SeqCst), 1);
R
Ryan Dahl 已提交
224 225
  }

226
  fn create_test_worker() -> Worker {
K
Kitson Kelly 已提交
227 228 229 230
    let state = ThreadSafeState::mock(vec![
      String::from("./deno"),
      String::from("hello.js"),
    ]);
231
    let mut worker =
232
      Worker::new("TEST".to_string(), startup_data::deno_isolate_init(), state);
233 234
    worker.execute("denoMain()").unwrap();
    worker.execute("workerMain()").unwrap();
235 236 237 238 239 240 241 242 243 244 245
    worker
  }

  #[test]
  fn test_worker_messages() {
    tokio_util::init(|| {
      let mut worker = create_test_worker();
      let source = r#"
        onmessage = function(e) {
          console.log("msg from main script", e.data);
          if (e.data == "exit") {
246
            delete window.onmessage;
247 248 249 250 251 252 253 254
            return;
          } else {
            console.assert(e.data === "hi");
          }
          postMessage([1, 2, 3]);
          console.log("after postMessage");
        }
        "#;
255
      worker.execute(source).unwrap();
256 257 258 259 260 261 262

      let resource = worker.state.resource.clone();
      let resource_ = resource.clone();

      tokio::spawn(lazy(move || {
        worker.then(move |r| -> Result<(), ()> {
          resource_.close();
263
          r.unwrap();
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
          Ok(())
        })
      }));

      let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();

      let r = resources::post_message_to_worker(resource.rid, msg).wait();
      assert!(r.is_ok());

      let maybe_msg = resources::get_message_from_worker(resource.rid)
        .wait()
        .unwrap();
      assert!(maybe_msg.is_some());
      // Check if message received is [1, 2, 3] in json
      assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");

      let msg = json!("exit")
        .to_string()
        .into_boxed_str()
        .into_boxed_bytes();
      let r = resources::post_message_to_worker(resource.rid, msg).wait();
      assert!(r.is_ok());
    })
  }

  #[test]
  fn removed_from_resource_table_on_close() {
    tokio_util::init(|| {
      let mut worker = create_test_worker();
293 294 295
      worker
        .execute("onmessage = () => { delete window.onmessage; }")
        .unwrap();
296 297 298 299

      let resource = worker.state.resource.clone();
      let rid = resource.rid;

300 301
      let worker_future = worker
        .then(move |r| -> Result<(), ()> {
302 303
          resource.close();
          println!("workers.rs after resource close");
304
          r.unwrap();
305
          Ok(())
306 307
        })
        .shared();
308 309 310

      let worker_future_ = worker_future.clone();
      tokio::spawn(lazy(move || worker_future_.then(|_| Ok(()))));
311 312 313 314 315 316 317 318

      assert_eq!(resources::get_type(rid), Some("worker".to_string()));

      let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
      let r = resources::post_message_to_worker(rid, msg).wait();
      assert!(r.is_ok());
      debug!("rid {:?}", rid);

319
      worker_future.wait().unwrap();
320 321 322
      assert_eq!(resources::get_type(rid), None);
    })
  }
323 324 325

  #[test]
  fn execute_mod_resolve_error() {
R
Ryan Dahl 已提交
326
    tokio_util::init(|| {
K
Kitson Kelly 已提交
327
      // "foo" is not a valid module specifier so this should return an error.
328
      let mut worker = create_test_worker();
329
      let module_specifier =
330
        ModuleSpecifier::resolve_url_or_path("does-not-exist").unwrap();
331
      let result = worker.execute_mod_async(&module_specifier, false).wait();
R
Ryan Dahl 已提交
332 333
      assert!(result.is_err());
    })
334 335 336 337
  }

  #[test]
  fn execute_mod_002_hello() {
R
Ryan Dahl 已提交
338 339 340
    tokio_util::init(|| {
      // This assumes cwd is project root (an assumption made throughout the
      // tests).
341
      let mut worker = create_test_worker();
342
      let module_specifier =
343
        ModuleSpecifier::resolve_url_or_path("./tests/002_hello.ts").unwrap();
344
      let result = worker.execute_mod_async(&module_specifier, false).wait();
R
Ryan Dahl 已提交
345 346
      assert!(result.is_ok());
    })
347
  }
348
}