worker.rs 13.8 KB
Newer Older
R
Ryan Dahl 已提交
1
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
2
use crate::fmt_errors::JSError;
3
use crate::ops;
4
use crate::state::ThreadSafeState;
5
use deno;
6
use deno::Buf;
7
use deno::ErrBox;
8
use deno::ModuleSpecifier;
B
Bert Belder 已提交
9
use deno::RecursiveLoad;
R
Ryan Dahl 已提交
10
use deno::StartupData;
B
Bartek Iwańczuk 已提交
11 12 13 14 15
use futures::channel::mpsc;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
16
use std::env;
B
Bartek Iwańczuk 已提交
17 18
use std::future::Future;
use std::pin::Pin;
19 20
use std::sync::Arc;
use std::sync::Mutex;
B
Bartek Iwańczuk 已提交
21 22
use std::task::Context;
use std::task::Poll;
23
use url::Url;
B
Bartek Iwańczuk 已提交
24

25
/// Wraps mpsc channels so they can be referenced
26 27 28 29 30 31 32
/// from ops and used to facilitate parent-child communication
/// for workers.
pub struct WorkerChannels {
  pub sender: mpsc::Sender<Buf>,
  pub receiver: mpsc::Receiver<Buf>,
}

33
/// Wraps deno::Isolate to provide source maps, ops for the CLI, and
34
/// high-level module loading.
35
#[derive(Clone)]
36
pub struct Worker {
37
  pub name: String,
38
  isolate: Arc<Mutex<deno::Isolate>>,
39
  pub state: ThreadSafeState,
40
  external_channels: Arc<Mutex<WorkerChannels>>,
41 42
}

43 44
impl Worker {
  pub fn new(
45
    name: String,
46
    startup_data: StartupData,
47
    state: ThreadSafeState,
48
    external_channels: WorkerChannels,
49
  ) -> Self {
50 51 52
    let isolate = Arc::new(Mutex::new(deno::Isolate::new(startup_data, false)));
    {
      let mut i = isolate.lock().unwrap();
53

54 55 56 57 58 59 60
      ops::compiler::init(&mut i, &state);
      ops::errors::init(&mut i, &state);
      ops::fetch::init(&mut i, &state);
      ops::files::init(&mut i, &state);
      ops::fs::init(&mut i, &state);
      ops::io::init(&mut i, &state);
      ops::net::init(&mut i, &state);
61
      ops::tls::init(&mut i, &state);
62 63 64 65 66 67 68 69
      ops::os::init(&mut i, &state);
      ops::permissions::init(&mut i, &state);
      ops::process::init(&mut i, &state);
      ops::random::init(&mut i, &state);
      ops::repl::init(&mut i, &state);
      ops::resources::init(&mut i, &state);
      ops::timers::init(&mut i, &state);
      ops::workers::init(&mut i, &state);
B
Bert Belder 已提交
70 71 72 73 74 75 76 77 78 79 80 81 82

      let state_ = state.clone();
      i.set_dyn_import(move |id, specifier, referrer| {
        let load_stream = RecursiveLoad::dynamic_import(
          id,
          specifier,
          referrer,
          state_.clone(),
          state_.modules.clone(),
        );
        Box::new(load_stream)
      });

83
      let global_state_ = state.global_state.clone();
84
      i.set_js_error_create(move |v8_exception| {
85
        JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler)
86
      })
87
    }
88

89 90 91 92
    Self {
      name,
      isolate,
      state,
93
      external_channels: Arc::new(Mutex::new(external_channels)),
94
    }
95 96
  }

97
  /// Same as execute2() but the filename defaults to "$CWD/__anonymous__".
98
  pub fn execute(&mut self, js_source: &str) -> Result<(), ErrBox> {
99 100 101
    let path = env::current_dir().unwrap().join("__anonymous__");
    let url = Url::from_file_path(path).unwrap();
    self.execute2(url.as_str(), js_source)
102 103 104 105 106
  }

  /// Executes the provided JavaScript source code. The js_filename argument is
  /// provided only for debugging purposes.
  pub fn execute2(
107
    &mut self,
108 109
    js_filename: &str,
    js_source: &str,
110
  ) -> Result<(), ErrBox> {
111
    let mut isolate = self.isolate.lock().unwrap();
112
    isolate.execute(js_filename, js_source)
113 114
  }

115
  /// Executes the provided JavaScript module.
116
  pub fn execute_mod_async(
117
    &mut self,
118
    module_specifier: &ModuleSpecifier,
119
    maybe_code: Option<String>,
120
    is_prefetch: bool,
B
Bartek Iwańczuk 已提交
121
  ) -> impl Future<Output = Result<(), ErrBox>> {
122 123
    let worker = self.clone();
    let loader = self.state.clone();
124
    let isolate = self.isolate.clone();
125
    let modules = self.state.modules.clone();
126 127 128 129 130
    let recursive_load = RecursiveLoad::main(
      &module_specifier.to_string(),
      maybe_code,
      loader,
      modules,
131 132 133 134
    );

    async move {
      let id = recursive_load.get_future(isolate).await?;
135
      worker.state.global_state.progress.done();
136 137

      if !is_prefetch {
138
        let mut isolate = worker.isolate.lock().unwrap();
139
        return isolate.mod_evaluate(id);
140
      }
141 142 143

      Ok(())
    }
144
  }
145

146 147 148
  /// Post message to worker as a host.
  ///
  /// This method blocks current thread.
149 150 151 152
  pub fn post_message(
    self: &Self,
    buf: Buf,
  ) -> impl Future<Output = Result<(), ErrBox>> {
B
Bartek Iwańczuk 已提交
153 154
    let channels = self.external_channels.lock().unwrap();
    let mut sender = channels.sender.clone();
155 156 157 158 159
    async move {
      let result = sender.send(buf).map_err(ErrBox::from).await;
      drop(sender);
      result
    }
160 161
  }

162
  /// Get message from worker as a host.
163
  pub fn get_message(self: &Self) -> WorkerReceiver {
164 165 166
    WorkerReceiver {
      channels: self.external_channels.clone(),
    }
167
  }
168
}
169

170
impl Future for Worker {
B
Bartek Iwańczuk 已提交
171
  type Output = Result<(), ErrBox>;
172

B
Bartek Iwańczuk 已提交
173 174 175 176
  fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
    let inner = self.get_mut();
    let mut isolate = inner.isolate.lock().unwrap();
    isolate.poll_unpin(cx)
177 178
  }
}
179

180 181 182 183
/// This structure wraps worker's resource id to implement future
/// that will return message received from worker or None
/// if worker's channel has been closed.
pub struct WorkerReceiver {
184
  channels: Arc<Mutex<WorkerChannels>>,
185 186 187
}

impl Future for WorkerReceiver {
B
Bartek Iwańczuk 已提交
188
  type Output = Result<Option<Buf>, ErrBox>;
189

B
Bartek Iwańczuk 已提交
190
  fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
191
    let mut channels = self.channels.lock().unwrap();
B
Bartek Iwańczuk 已提交
192 193 194 195
    match channels.receiver.poll_next_unpin(cx) {
      Poll::Ready(v) => Poll::Ready(Ok(v)),
      Poll::Pending => Poll::Pending,
    }
196 197 198
  }
}

R
Ryan Dahl 已提交
199 200 201
#[cfg(test)]
mod tests {
  use super::*;
202
  use crate::flags;
203 204
  use crate::flags::DenoFlags;
  use crate::global_state::ThreadSafeGlobalState;
R
Ryan Dahl 已提交
205
  use crate::progress::Progress;
206
  use crate::startup_data;
207
  use crate::state::ThreadSafeState;
208
  use crate::tokio_util;
209
  use futures::executor::block_on;
210
  use std::sync::atomic::Ordering;
211

212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
  pub fn run_in_task<F>(f: F)
  where
    F: FnOnce() + Send + 'static,
  {
    let fut = futures::future::lazy(move |_cx| {
      f();
      Ok(())
    });

    tokio_util::run(fut)
  }

  pub fn panic_on_error<I, E, F>(f: F) -> impl Future<Output = Result<I, ()>>
  where
    F: Future<Output = Result<I, E>>,
    E: std::fmt::Debug,
  {
    f.map_err(|err| panic!("Future got unexpected error: {:?}", err))
  }

232
  #[test]
233
  fn execute_mod_esm_imports_a() {
234 235 236 237 238
    let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
      .parent()
      .unwrap()
      .join("tests/esm_imports_a.js")
      .to_owned();
239
    let module_specifier =
240
      ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap();
241
    let argv = vec![String::from("./deno"), module_specifier.to_string()];
242
    let global_state = ThreadSafeGlobalState::new(
R
Ryan Dahl 已提交
243 244 245
      flags::DenoFlags::default(),
      argv,
      Progress::new(),
246 247
    )
    .unwrap();
248 249 250 251 252 253 254 255
    let (int, ext) = ThreadSafeState::create_channels();
    let state = ThreadSafeState::new(
      global_state,
      Some(module_specifier.clone()),
      true,
      int,
    )
    .unwrap();
256
    let state_ = state.clone();
B
Bartek Iwańczuk 已提交
257
    tokio_util::run(async move {
258
      let mut worker =
259
        Worker::new("TEST".to_string(), StartupData::None, state, ext);
260
      let result = worker
261
        .execute_mod_async(&module_specifier, None, false)
262 263 264 265
        .await;
      if let Err(err) = result {
        eprintln!("execute_mod err {:?}", err);
      }
266
      panic_on_error(worker).await
B
Bartek Iwańczuk 已提交
267
    });
268 269

    let metrics = &state_.metrics;
270
    assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 2);
R
Ryan Dahl 已提交
271 272
    // Check that we didn't start the compiler.
    assert_eq!(metrics.compiler_starts.load(Ordering::SeqCst), 0);
273 274 275 276
  }

  #[test]
  fn execute_mod_circular() {
277 278 279 280 281
    let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
      .parent()
      .unwrap()
      .join("tests/circular1.ts")
      .to_owned();
282
    let module_specifier =
283 284
      ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap();
    let argv = vec![String::from("deno"), module_specifier.to_string()];
285 286 287
    let global_state =
      ThreadSafeGlobalState::new(DenoFlags::default(), argv, Progress::new())
        .unwrap();
288 289 290 291 292 293 294 295
    let (int, ext) = ThreadSafeState::create_channels();
    let state = ThreadSafeState::new(
      global_state,
      Some(module_specifier.clone()),
      true,
      int,
    )
    .unwrap();
296
    let state_ = state.clone();
B
Bartek Iwańczuk 已提交
297
    tokio_util::run(async move {
298
      let mut worker =
299
        Worker::new("TEST".to_string(), StartupData::None, state, ext);
300
      let result = worker
301
        .execute_mod_async(&module_specifier, None, false)
302 303 304 305
        .await;
      if let Err(err) = result {
        eprintln!("execute_mod err {:?}", err);
      }
306
      panic_on_error(worker).await
B
Bartek Iwańczuk 已提交
307
    });
308 309

    let metrics = &state_.metrics;
310
    // TODO  assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 2);
R
Ryan Dahl 已提交
311 312
    // Check that we didn't start the compiler.
    assert_eq!(metrics.compiler_starts.load(Ordering::SeqCst), 0);
313
  }
314

R
Ryan Dahl 已提交
315 316
  #[test]
  fn execute_006_url_imports() {
317
    let http_server_guard = crate::test_util::http_server();
318

319 320 321
    let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
      .parent()
      .unwrap()
322
      .join("cli/tests/006_url_imports.ts")
323
      .to_owned();
324
    let module_specifier =
325
      ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap();
326
    let argv = vec![String::from("deno"), module_specifier.to_string()];
R
Ryan Dahl 已提交
327 328
    let mut flags = flags::DenoFlags::default();
    flags.reload = true;
329 330
    let global_state =
      ThreadSafeGlobalState::new(flags, argv, Progress::new()).unwrap();
331
    let (int, ext) = ThreadSafeState::create_channels();
332 333 334 335
    let state = ThreadSafeState::new(
      global_state.clone(),
      Some(module_specifier.clone()),
      true,
336
      int,
337 338 339
    )
    .unwrap();
    let global_state_ = global_state.clone();
R
Ryan Dahl 已提交
340
    let state_ = state.clone();
B
Bartek Iwańczuk 已提交
341
    tokio_util::run(async move {
R
Ryan Dahl 已提交
342 343 344 345
      let mut worker = Worker::new(
        "TEST".to_string(),
        startup_data::deno_isolate_init(),
        state,
346
        ext,
R
Ryan Dahl 已提交
347
      );
348
      worker.execute("denoMain()").unwrap();
349
      let result = worker
350
        .execute_mod_async(&module_specifier, None, false)
351 352 353 354 355
        .await;

      if let Err(err) = result {
        eprintln!("execute_mod err {:?}", err);
      }
356
      panic_on_error(worker).await
B
Bartek Iwańczuk 已提交
357
    });
R
Ryan Dahl 已提交
358

359
    assert_eq!(state_.metrics.resolve_count.load(Ordering::SeqCst), 3);
R
Ryan Dahl 已提交
360
    // Check that we've only invoked the compiler once.
361 362 363 364
    assert_eq!(
      global_state_.metrics.compiler_starts.load(Ordering::SeqCst),
      1
    );
365
    drop(http_server_guard);
R
Ryan Dahl 已提交
366 367
  }

368
  fn create_test_worker() -> Worker {
369 370 371 372 373 374 375 376 377 378 379
    let (int, ext) = ThreadSafeState::create_channels();
    let state = ThreadSafeState::mock(
      vec![String::from("./deno"), String::from("hello.js")],
      int,
    );
    let mut worker = Worker::new(
      "TEST".to_string(),
      startup_data::deno_isolate_init(),
      state,
      ext,
    );
380 381
    worker.execute("denoMain()").unwrap();
    worker.execute("workerMain()").unwrap();
382 383 384 385 386
    worker
  }

  #[test]
  fn test_worker_messages() {
387
    run_in_task(|| {
388 389 390 391 392
      let mut worker = create_test_worker();
      let source = r#"
        onmessage = function(e) {
          console.log("msg from main script", e.data);
          if (e.data == "exit") {
393
            delete window.onmessage;
394 395 396 397 398 399 400 401
            return;
          } else {
            console.assert(e.data === "hi");
          }
          postMessage([1, 2, 3]);
          console.log("after postMessage");
        }
        "#;
402
      worker.execute(source).unwrap();
403

404
      let worker_ = worker.clone();
405

406 407 408 409 410 411 412
      let fut = async move {
        let r = worker.await;
        r.unwrap();
        Ok(())
      };

      tokio::spawn(fut.boxed().compat());
413 414 415

      let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();

416
      let r = block_on(worker_.post_message(msg));
417 418
      assert!(r.is_ok());

419
      let maybe_msg = block_on(worker_.get_message()).unwrap();
420 421 422 423 424 425 426 427
      assert!(maybe_msg.is_some());
      // Check if message received is [1, 2, 3] in json
      assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");

      let msg = json!("exit")
        .to_string()
        .into_boxed_str()
        .into_boxed_bytes();
428
      let r = block_on(worker_.post_message(msg));
429 430 431 432 433 434
      assert!(r.is_ok());
    })
  }

  #[test]
  fn removed_from_resource_table_on_close() {
435
    run_in_task(|| {
436
      let mut worker = create_test_worker();
437 438 439
      worker
        .execute("onmessage = () => { delete window.onmessage; }")
        .unwrap();
440

441
      let worker_ = worker.clone();
442
      let worker_future = worker
B
Bartek Iwańczuk 已提交
443
        .then(move |r| {
444
          println!("workers.rs after resource close");
445
          r.unwrap();
B
Bartek Iwańczuk 已提交
446
          futures::future::ok(())
447 448
        })
        .shared();
449 450

      let worker_future_ = worker_future.clone();
B
Bartek Iwańczuk 已提交
451 452 453 454 455
      tokio::spawn(
        worker_future_
          .then(|_: Result<(), ()>| futures::future::ok(()))
          .compat(),
      );
456 457

      let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
458
      let r = block_on(worker_.post_message(msg));
459 460
      assert!(r.is_ok());

461
      block_on(worker_future).unwrap();
462 463
    })
  }
464 465 466

  #[test]
  fn execute_mod_resolve_error() {
467
    run_in_task(|| {
K
Kitson Kelly 已提交
468
      // "foo" is not a valid module specifier so this should return an error.
469
      let mut worker = create_test_worker();
470
      let module_specifier =
471
        ModuleSpecifier::resolve_url_or_path("does-not-exist").unwrap();
472 473
      let result =
        block_on(worker.execute_mod_async(&module_specifier, None, false));
R
Ryan Dahl 已提交
474 475
      assert!(result.is_err());
    })
476 477 478 479
  }

  #[test]
  fn execute_mod_002_hello() {
480
    run_in_task(|| {
R
Ryan Dahl 已提交
481 482
      // This assumes cwd is project root (an assumption made throughout the
      // tests).
483
      let mut worker = create_test_worker();
484 485 486 487 488
      let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
        .parent()
        .unwrap()
        .join("tests/002_hello.ts")
        .to_owned();
489
      let module_specifier =
490
        ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap();
491 492
      let result =
        block_on(worker.execute_mod_async(&module_specifier, None, false));
R
Ryan Dahl 已提交
493 494
      assert!(result.is_ok());
    })
495
  }
496
}