state.rs 5.2 KB
Newer Older
1 2
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use crate::deno_dir;
3
use crate::errors::DenoResult;
4 5
use crate::flags;
use crate::global_timer::GlobalTimer;
6
use crate::ops;
7
use crate::permissions::DenoPermissions;
8
use crate::resources;
A
andy finch 已提交
9
use crate::resources::ResourceId;
10
use crate::worker::Worker;
11
use deno::deno_buf;
12
use deno::Buf;
13 14
use deno::Dispatch;
use deno::Op;
A
andy finch 已提交
15
use futures::future::Shared;
16
use std;
A
andy finch 已提交
17
use std::collections::HashMap;
18
use std::env;
19
use std::ops::Deref;
20
use std::sync::atomic::{AtomicUsize, Ordering};
21
use std::sync::Arc;
22
use std::sync::Mutex;
23
use std::time::Instant;
24
use tokio::sync::mpsc as async_mpsc;
25 26 27 28

pub type WorkerSender = async_mpsc::Sender<Buf>;
pub type WorkerReceiver = async_mpsc::Receiver<Buf>;
pub type WorkerChannels = (WorkerSender, WorkerReceiver);
29
pub type UserWorkerTable = HashMap<ResourceId, Shared<Worker>>;
30 31 32 33 34 35 36 37 38 39 40 41

// AtomicU64 is currently unstable
#[derive(Default)]
pub struct Metrics {
  pub ops_dispatched: AtomicUsize,
  pub ops_completed: AtomicUsize,
  pub bytes_sent_control: AtomicUsize,
  pub bytes_sent_data: AtomicUsize,
  pub bytes_received: AtomicUsize,
  pub resolve_count: AtomicUsize,
}

42 43 44 45 46
// Wrap State so that it can implement Dispatch.
pub struct ThreadSafeState(Arc<State>);

// Isolate cannot be passed between threads but ThreadSafeState can.
// ThreadSafeState satisfies Send and Sync.
47
// So any state that needs to be accessed outside the main V8 thread should be
48
// inside ThreadSafeState.
49
#[cfg_attr(feature = "cargo-clippy", allow(stutter))]
50
pub struct State {
51 52
  pub dir: deno_dir::DenoDir,
  pub argv: Vec<String>,
53
  pub permissions: DenoPermissions,
54 55
  pub flags: flags::DenoFlags,
  pub metrics: Metrics,
56
  pub worker_channels: Mutex<WorkerChannels>,
57
  pub global_timer: Mutex<GlobalTimer>,
A
andy finch 已提交
58
  pub workers: Mutex<UserWorkerTable>,
59
  pub start_time: Instant,
60
  pub resource: resources::Resource,
A
andy finch 已提交
61
  pub dispatch_selector: ops::OpSelector,
62 63
}

64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
impl Clone for ThreadSafeState {
  fn clone(&self) -> Self {
    ThreadSafeState(self.0.clone())
  }
}

impl Deref for ThreadSafeState {
  type Target = Arc<State>;
  fn deref(&self) -> &Self::Target {
    &self.0
  }
}

impl Dispatch for ThreadSafeState {
  fn dispatch(
    &mut self,
    control: &[u8],
    zero_copy: deno_buf,
  ) -> (bool, Box<Op>) {
A
andy finch 已提交
83
    ops::dispatch_all(self, control, zero_copy, self.dispatch_selector)
84 85 86 87
  }
}

impl ThreadSafeState {
A
andy finch 已提交
88 89 90 91 92
  pub fn new(
    flags: flags::DenoFlags,
    argv_rest: Vec<String>,
    dispatch_selector: ops::OpSelector,
  ) -> Self {
B
Bert Belder 已提交
93
    let custom_root = env::var("DENO_DIR").map(String::into).ok();
94

95 96 97 98 99 100
    let (worker_in_tx, worker_in_rx) = async_mpsc::channel::<Buf>(1);
    let (worker_out_tx, worker_out_rx) = async_mpsc::channel::<Buf>(1);
    let internal_channels = (worker_out_tx, worker_in_rx);
    let external_channels = (worker_in_tx, worker_out_rx);
    let resource = resources::add_worker(external_channels);

101
    ThreadSafeState(Arc::new(State {
102
      dir: deno_dir::DenoDir::new(custom_root).unwrap(),
103
      argv: argv_rest,
104
      permissions: DenoPermissions::from_flags(&flags),
105 106
      flags,
      metrics: Metrics::default(),
107
      worker_channels: Mutex::new(internal_channels),
108
      global_timer: Mutex::new(GlobalTimer::new()),
A
andy finch 已提交
109
      workers: Mutex::new(UserWorkerTable::new()),
110
      start_time: Instant::now(),
111
      resource,
A
andy finch 已提交
112
      dispatch_selector,
113
    }))
114 115
  }

A
andy finch 已提交
116
  /// Read main module from argv
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
  pub fn main_module(&self) -> Option<String> {
    if self.argv.len() <= 1 {
      None
    } else {
      let specifier = self.argv[1].clone();
      let referrer = ".";
      match self.dir.resolve_module_url(&specifier, referrer) {
        Ok(url) => Some(url.to_string()),
        Err(e) => {
          debug!("Potentially swallowed error {}", e);
          None
        }
      }
    }
  }

133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
  #[inline]
  pub fn check_read(&self, filename: &str) -> DenoResult<()> {
    self.permissions.check_read(filename)
  }

  #[inline]
  pub fn check_write(&self, filename: &str) -> DenoResult<()> {
    self.permissions.check_write(filename)
  }

  #[inline]
  pub fn check_env(&self) -> DenoResult<()> {
    self.permissions.check_env()
  }

  #[inline]
  pub fn check_net(&self, filename: &str) -> DenoResult<()> {
    self.permissions.check_net(filename)
  }

  #[inline]
  pub fn check_run(&self) -> DenoResult<()> {
    self.permissions.check_run()
  }

158
  #[cfg(test)]
159
  pub fn mock() -> ThreadSafeState {
160 161
    let argv = vec![String::from("./deno"), String::from("hello.js")];
    // For debugging: argv.push_back(String::from("-D"));
162
    let (flags, rest_argv) = flags::set_flags(argv).unwrap();
A
andy finch 已提交
163
    ThreadSafeState::new(flags, rest_argv, ops::op_selector_std)
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
  }

  pub fn metrics_op_dispatched(
    &self,
    bytes_sent_control: usize,
    bytes_sent_data: usize,
  ) {
    self.metrics.ops_dispatched.fetch_add(1, Ordering::SeqCst);
    self
      .metrics
      .bytes_sent_control
      .fetch_add(bytes_sent_control, Ordering::SeqCst);
    self
      .metrics
      .bytes_sent_data
      .fetch_add(bytes_sent_data, Ordering::SeqCst);
  }

  pub fn metrics_op_completed(&self, bytes_received: usize) {
    self.metrics.ops_completed.fetch_add(1, Ordering::SeqCst);
    self
      .metrics
      .bytes_received
      .fetch_add(bytes_received, Ordering::SeqCst);
  }
}
190 191 192 193 194 195

#[test]
fn thread_safe() {
  fn f<S: Send + Sync>(_: S) {}
  f(ThreadSafeState::mock());
}