worker.rs 9.7 KB
Newer Older
R
Ryan Dahl 已提交
1
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
2
use crate::fmt_errors::JSError;
3
use crate::state::ThreadSafeState;
4
use crate::tokio_util;
5
use deno;
6
use deno::ErrBox;
7
use deno::ModuleSpecifier;
R
Ryan Dahl 已提交
8
use deno::StartupData;
9
use futures::Async;
R
Ryan Dahl 已提交
10
use futures::Future;
11 12
use std::sync::Arc;
use std::sync::Mutex;
B
Bartek Iwańczuk 已提交
13

14
/// Wraps deno::Isolate to provide source maps, ops for the CLI, and
15
/// high-level module loading
16
#[derive(Clone)]
17
pub struct Worker {
18
  isolate: Arc<Mutex<deno::Isolate>>,
19
  pub state: ThreadSafeState,
20 21
}

22 23 24 25
impl Worker {
  pub fn new(
    _name: String,
    startup_data: StartupData,
26
    state: ThreadSafeState,
27
  ) -> Worker {
28 29 30
    let isolate = Arc::new(Mutex::new(deno::Isolate::new(startup_data, false)));
    {
      let mut i = isolate.lock().unwrap();
31
      let state_ = state.clone();
32 33 34
      i.set_dispatch(move |control_buf, zero_copy_buf| {
        state_.dispatch(control_buf, zero_copy_buf)
      });
35 36
      let state_ = state.clone();
      i.set_js_error_create(move |v8_exception| {
B
Bartek Iwańczuk 已提交
37
        JSError::from_v8_exception(v8_exception, &state_.ts_compiler)
38
      })
39
    }
40
    Self { isolate, state }
41 42
  }

43
  /// Same as execute2() but the filename defaults to "<anonymous>".
44
  pub fn execute(&mut self, js_source: &str) -> Result<(), ErrBox> {
45 46 47 48 49 50
    self.execute2("<anonymous>", js_source)
  }

  /// Executes the provided JavaScript source code. The js_filename argument is
  /// provided only for debugging purposes.
  pub fn execute2(
51
    &mut self,
52 53
    js_filename: &str,
    js_source: &str,
54
  ) -> Result<(), ErrBox> {
55
    let mut isolate = self.isolate.lock().unwrap();
56
    isolate.execute(js_filename, js_source)
57 58
  }

59
  /// Executes the provided JavaScript module.
60
  pub fn execute_mod_async(
61
    &mut self,
62
    module_specifier: &ModuleSpecifier,
63
    is_prefetch: bool,
64
  ) -> impl Future<Item = (), Error = ErrBox> {
65 66
    let worker = self.clone();
    let loader = self.state.clone();
67
    let isolate = self.isolate.clone();
68
    let modules = self.state.modules.clone();
69 70 71 72 73 74
    let recursive_load = deno::RecursiveLoad::new(
      &module_specifier.to_string(),
      loader,
      isolate,
      modules,
    );
75 76 77 78 79 80 81 82 83
    recursive_load.and_then(move |id| -> Result<(), ErrBox> {
      worker.state.progress.done();
      if is_prefetch {
        Ok(())
      } else {
        let mut isolate = worker.isolate.lock().unwrap();
        isolate.mod_evaluate(id)
      }
    })
84 85
  }

86
  /// Executes the provided JavaScript module.
87
  pub fn execute_mod(
88
    &mut self,
89
    module_specifier: &ModuleSpecifier,
90
    is_prefetch: bool,
91
  ) -> Result<(), ErrBox> {
92
    tokio_util::block_on(self.execute_mod_async(module_specifier, is_prefetch))
93
  }
94
}
95

96
impl Future for Worker {
97
  type Item = ();
98
  type Error = ErrBox;
99

100
  fn poll(&mut self) -> Result<Async<()>, ErrBox> {
101
    let mut isolate = self.isolate.lock().unwrap();
102
    isolate.poll()
103 104
  }
}
105

R
Ryan Dahl 已提交
106 107 108
#[cfg(test)]
mod tests {
  use super::*;
109
  use crate::flags;
A
andy finch 已提交
110
  use crate::ops::op_selector_std;
R
Ryan Dahl 已提交
111
  use crate::progress::Progress;
112 113
  use crate::resources;
  use crate::startup_data;
114
  use crate::state::ThreadSafeState;
115
  use crate::tokio_util;
116 117
  use futures::future::lazy;
  use std::sync::atomic::Ordering;
118 119

  #[test]
120
  fn execute_mod_esm_imports_a() {
121
    let module_specifier =
122
      ModuleSpecifier::resolve_url_or_path("tests/esm_imports_a.js").unwrap();
123
    let argv = vec![String::from("./deno"), module_specifier.to_string()];
R
Ryan Dahl 已提交
124 125 126 127 128
    let state = ThreadSafeState::new(
      flags::DenoFlags::default(),
      argv,
      op_selector_std,
      Progress::new(),
129
      true,
130 131
    )
    .unwrap();
132 133
    let state_ = state.clone();
    tokio_util::run(lazy(move || {
134 135
      let mut worker =
        Worker::new("TEST".to_string(), StartupData::None, state);
136
      let result = worker.execute_mod(&module_specifier, false);
137 138 139
      if let Err(err) = result {
        eprintln!("execute_mod err {:?}", err);
      }
140
      tokio_util::panic_on_error(worker)
141 142 143
    }));

    let metrics = &state_.metrics;
144
    assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 2);
R
Ryan Dahl 已提交
145 146
    // Check that we didn't start the compiler.
    assert_eq!(metrics.compiler_starts.load(Ordering::SeqCst), 0);
147 148 149 150
  }

  #[test]
  fn execute_mod_circular() {
151
    let module_specifier =
152
      ModuleSpecifier::resolve_url_or_path("tests/circular1.js").unwrap();
153
    let argv = vec![String::from("./deno"), module_specifier.to_string()];
R
Ryan Dahl 已提交
154 155 156 157 158
    let state = ThreadSafeState::new(
      flags::DenoFlags::default(),
      argv,
      op_selector_std,
      Progress::new(),
159
      true,
160 161
    )
    .unwrap();
162 163
    let state_ = state.clone();
    tokio_util::run(lazy(move || {
164 165
      let mut worker =
        Worker::new("TEST".to_string(), StartupData::None, state);
166
      let result = worker.execute_mod(&module_specifier, false);
167 168 169
      if let Err(err) = result {
        eprintln!("execute_mod err {:?}", err);
      }
170
      tokio_util::panic_on_error(worker)
171 172 173
    }));

    let metrics = &state_.metrics;
174
    assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 2);
R
Ryan Dahl 已提交
175 176
    // Check that we didn't start the compiler.
    assert_eq!(metrics.compiler_starts.load(Ordering::SeqCst), 0);
177
  }
178

R
Ryan Dahl 已提交
179 180
  #[test]
  fn execute_006_url_imports() {
181
    let module_specifier =
182
      ModuleSpecifier::resolve_url_or_path("tests/006_url_imports.ts").unwrap();
183
    let argv = vec![String::from("deno"), module_specifier.to_string()];
R
Ryan Dahl 已提交
184 185 186
    let mut flags = flags::DenoFlags::default();
    flags.reload = true;
    let state =
187
      ThreadSafeState::new(flags, argv, op_selector_std, Progress::new(), true)
188
        .unwrap();
R
Ryan Dahl 已提交
189 190 191 192 193 194 195
    let state_ = state.clone();
    tokio_util::run(lazy(move || {
      let mut worker = Worker::new(
        "TEST".to_string(),
        startup_data::deno_isolate_init(),
        state,
      );
196
      worker.execute("denoMain()").unwrap();
197
      let result = worker.execute_mod(&module_specifier, false);
198 199 200
      if let Err(err) = result {
        eprintln!("execute_mod err {:?}", err);
      }
R
Ryan Dahl 已提交
201 202 203 204 205
      tokio_util::panic_on_error(worker)
    }));

    let metrics = &state_.metrics;
    assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 3);
R
Ryan Dahl 已提交
206 207
    // Check that we've only invoked the compiler once.
    assert_eq!(metrics.compiler_starts.load(Ordering::SeqCst), 1);
R
Ryan Dahl 已提交
208 209
  }

210
  fn create_test_worker() -> Worker {
K
Kitson Kelly 已提交
211 212 213 214
    let state = ThreadSafeState::mock(vec![
      String::from("./deno"),
      String::from("hello.js"),
    ]);
215
    let mut worker =
216
      Worker::new("TEST".to_string(), startup_data::deno_isolate_init(), state);
217 218
    worker.execute("denoMain()").unwrap();
    worker.execute("workerMain()").unwrap();
219 220 221 222 223 224 225 226 227 228 229
    worker
  }

  #[test]
  fn test_worker_messages() {
    tokio_util::init(|| {
      let mut worker = create_test_worker();
      let source = r#"
        onmessage = function(e) {
          console.log("msg from main script", e.data);
          if (e.data == "exit") {
230
            delete window.onmessage;
231 232 233 234 235 236 237 238
            return;
          } else {
            console.assert(e.data === "hi");
          }
          postMessage([1, 2, 3]);
          console.log("after postMessage");
        }
        "#;
239
      worker.execute(source).unwrap();
240 241 242 243 244 245 246

      let resource = worker.state.resource.clone();
      let resource_ = resource.clone();

      tokio::spawn(lazy(move || {
        worker.then(move |r| -> Result<(), ()> {
          resource_.close();
247
          r.unwrap();
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
          Ok(())
        })
      }));

      let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();

      let r = resources::post_message_to_worker(resource.rid, msg).wait();
      assert!(r.is_ok());

      let maybe_msg = resources::get_message_from_worker(resource.rid)
        .wait()
        .unwrap();
      assert!(maybe_msg.is_some());
      // Check if message received is [1, 2, 3] in json
      assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");

      let msg = json!("exit")
        .to_string()
        .into_boxed_str()
        .into_boxed_bytes();
      let r = resources::post_message_to_worker(resource.rid, msg).wait();
      assert!(r.is_ok());
    })
  }

  #[test]
  fn removed_from_resource_table_on_close() {
    tokio_util::init(|| {
      let mut worker = create_test_worker();
277 278 279
      worker
        .execute("onmessage = () => { delete window.onmessage; }")
        .unwrap();
280 281 282 283

      let resource = worker.state.resource.clone();
      let rid = resource.rid;

284 285
      let worker_future = worker
        .then(move |r| -> Result<(), ()> {
286 287
          resource.close();
          println!("workers.rs after resource close");
288
          r.unwrap();
289
          Ok(())
290 291
        })
        .shared();
292 293 294

      let worker_future_ = worker_future.clone();
      tokio::spawn(lazy(move || worker_future_.then(|_| Ok(()))));
295 296 297 298 299 300 301 302

      assert_eq!(resources::get_type(rid), Some("worker".to_string()));

      let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
      let r = resources::post_message_to_worker(rid, msg).wait();
      assert!(r.is_ok());
      debug!("rid {:?}", rid);

303
      worker_future.wait().unwrap();
304 305 306
      assert_eq!(resources::get_type(rid), None);
    })
  }
307 308 309

  #[test]
  fn execute_mod_resolve_error() {
R
Ryan Dahl 已提交
310
    tokio_util::init(|| {
K
Kitson Kelly 已提交
311
      // "foo" is not a valid module specifier so this should return an error.
312
      let mut worker = create_test_worker();
313
      let module_specifier =
314
        ModuleSpecifier::resolve_url_or_path("does-not-exist").unwrap();
315
      let result = worker.execute_mod_async(&module_specifier, false).wait();
R
Ryan Dahl 已提交
316 317
      assert!(result.is_err());
    })
318 319 320 321
  }

  #[test]
  fn execute_mod_002_hello() {
R
Ryan Dahl 已提交
322 323 324
    tokio_util::init(|| {
      // This assumes cwd is project root (an assumption made throughout the
      // tests).
325
      let mut worker = create_test_worker();
326
      let module_specifier =
327
        ModuleSpecifier::resolve_url_or_path("./tests/002_hello.ts").unwrap();
328
      let result = worker.execute_mod_async(&module_specifier, false).wait();
R
Ryan Dahl 已提交
329 330
      assert!(result.is_ok());
    })
331
  }
332
}