io.rs 9.7 KB
Newer Older
R
Ryan Dahl 已提交
1 2
use super::dispatch_minimal::MinimalOp;
use crate::deno_error;
3
use crate::deno_error::bad_resource;
4
use crate::http_body::HttpBody;
5 6
use crate::ops::minimal_op;
use crate::state::ThreadSafeState;
7 8
use deno::ErrBox;
use deno::Resource;
9
use deno::*;
10
use futures;
B
Bartek Iwańczuk 已提交
11 12 13 14
use futures::compat::AsyncRead01CompatExt;
use futures::compat::AsyncWrite01CompatExt;
use futures::future::FutureExt;
use futures::io::{AsyncRead, AsyncWrite};
15
use std;
B
Bartek Iwańczuk 已提交
16 17 18 19
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
use tokio;
use tokio::net::TcpStream;
use tokio_process;
use tokio_rustls::client::TlsStream as ClientTlsStream;
use tokio_rustls::server::TlsStream as ServerTlsStream;

#[cfg(not(windows))]
use std::os::unix::io::FromRawFd;

#[cfg(windows)]
use std::os::windows::io::FromRawHandle;

#[cfg(windows)]
extern crate winapi;

lazy_static! {
  /// Due to portability issues on Windows handle to stdout is created from raw file descriptor.
  /// The caveat of that approach is fact that when this handle is dropped underlying
  /// file descriptor is closed - that is highly not desirable in case of stdout.
  /// That's why we store this global handle that is then cloned when obtaining stdio
  /// for process. In turn when resource table is dropped storing reference to that handle,
  /// the handle itself won't be closed (so Deno.core.print) will still work.
  static ref STDOUT_HANDLE: std::fs::File = {
    #[cfg(not(windows))]
    let stdout = unsafe { std::fs::File::from_raw_fd(1) };
    #[cfg(windows)]
    let stdout = unsafe {
      std::fs::File::from_raw_handle(winapi::um::processenv::GetStdHandle(
        winapi::um::winbase::STD_OUTPUT_HANDLE,
      ))
    };

    stdout
  };
}
R
Ryan Dahl 已提交
55

56
pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
  i.register_op(
    "read",
    s.core_op(minimal_op(s.stateful_minimal_op(op_read))),
  );
  i.register_op(
    "write",
    s.core_op(minimal_op(s.stateful_minimal_op(op_write))),
  );
}

pub fn get_stdio() -> (StreamResource, StreamResource, StreamResource) {
  let stdin = StreamResource::Stdin(tokio::io::stdin());
  let stdout = StreamResource::Stdout({
    let stdout = STDOUT_HANDLE
      .try_clone()
      .expect("Unable to clone stdout handle");
    tokio::fs::File::from_std(stdout)
  });
  let stderr = StreamResource::Stderr(tokio::io::stderr());

  (stdin, stdout, stderr)
}

pub enum StreamResource {
  Stdin(tokio::io::Stdin),
  Stdout(tokio::fs::File),
  Stderr(tokio::io::Stderr),
  FsFile(tokio::fs::File),
  TcpStream(tokio::net::TcpStream),
  ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
  ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
B
Bartek Iwańczuk 已提交
88
  HttpBody(Box<HttpBody>),
89 90 91 92 93 94 95 96 97 98
  ChildStdin(tokio_process::ChildStdin),
  ChildStdout(tokio_process::ChildStdout),
  ChildStderr(tokio_process::ChildStderr),
}

impl Resource for StreamResource {}

/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait
/// but uses an `ErrBox` error instead of `std::io:Error`
pub trait DenoAsyncRead {
B
Bartek Iwańczuk 已提交
99 100 101 102 103
  fn poll_read(
    self: Pin<&mut Self>,
    cx: &mut Context,
    buf: &mut [u8],
  ) -> Poll<Result<usize, ErrBox>>;
104 105 106
}

impl DenoAsyncRead for StreamResource {
B
Bartek Iwańczuk 已提交
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
  fn poll_read(
    self: Pin<&mut Self>,
    cx: &mut Context,
    buf: &mut [u8],
  ) -> Poll<Result<usize, ErrBox>> {
    let inner = self.get_mut();
    let mut f: Box<dyn AsyncRead + Unpin> = match inner {
      StreamResource::FsFile(f) => Box::new(AsyncRead01CompatExt::compat(f)),
      StreamResource::Stdin(f) => Box::new(AsyncRead01CompatExt::compat(f)),
      StreamResource::TcpStream(f) => Box::new(AsyncRead01CompatExt::compat(f)),
      StreamResource::ClientTlsStream(f) => {
        Box::new(AsyncRead01CompatExt::compat(f))
      }
      StreamResource::ServerTlsStream(f) => {
        Box::new(AsyncRead01CompatExt::compat(f))
      }
      StreamResource::HttpBody(f) => Box::new(f),
      StreamResource::ChildStdout(f) => {
        Box::new(AsyncRead01CompatExt::compat(f))
      }
      StreamResource::ChildStderr(f) => {
        Box::new(AsyncRead01CompatExt::compat(f))
      }
130
      _ => {
B
Bartek Iwańczuk 已提交
131
        return Poll::Ready(Err(bad_resource()));
132 133 134
      }
    };

B
Bartek Iwańczuk 已提交
135 136 137 138 139 140 141
    let r = AsyncRead::poll_read(Pin::new(&mut f), cx, buf);

    match r {
      Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))),
      Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
      Poll::Pending => Poll::Pending,
    }
142
  }
143 144
}

145 146 147 148 149 150 151 152 153 154 155
#[derive(Debug, PartialEq)]
enum IoState {
  Pending,
  Done,
}

/// Tries to read some bytes directly into the given `buf` in asynchronous
/// manner, returning a future type.
///
/// The returned future will resolve to both the I/O stream and the buffer
/// as well as the number of bytes read once the read operation is completed.
156
pub fn read<T>(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Read<T>
157 158 159 160 161 162
where
  T: AsMut<[u8]>,
{
  Read {
    rid,
    buf,
163 164
    io_state: IoState::Pending,
    state: state.clone(),
165 166 167 168 169 170 171 172 173 174
  }
}

/// A future which can be used to easily read available number of bytes to fill
/// a buffer.
///
/// Created by the [`read`] function.
pub struct Read<T> {
  rid: ResourceId,
  buf: T,
175 176
  io_state: IoState,
  state: ThreadSafeState,
177 178 179 180
}

impl<T> Future for Read<T>
where
B
Bartek Iwańczuk 已提交
181
  T: AsMut<[u8]> + Unpin,
182
{
B
Bartek Iwańczuk 已提交
183
  type Output = Result<i32, ErrBox>;
184

B
Bartek Iwańczuk 已提交
185 186 187
  fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
    let inner = self.get_mut();
    if inner.io_state == IoState::Done {
188 189 190
      panic!("poll a Read after it's done");
    }

B
Bartek Iwańczuk 已提交
191
    let mut table = inner.state.lock_resource_table();
192
    let resource = table
B
Bartek Iwańczuk 已提交
193
      .get_mut::<StreamResource>(inner.rid)
194
      .ok_or_else(bad_resource)?;
B
Bartek Iwańczuk 已提交
195 196 197 198 199 200 201 202 203 204 205
    let nread = match DenoAsyncRead::poll_read(
      Pin::new(resource),
      cx,
      &mut inner.buf.as_mut()[..],
    ) {
      Poll::Ready(Ok(v)) => v,
      Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
      Poll::Pending => return Poll::Pending,
    };
    inner.io_state = IoState::Done;
    Poll::Ready(Ok(nread as i32))
206 207 208
  }
}

209 210 211 212
pub fn op_read(
  state: &ThreadSafeState,
  rid: i32,
  zero_copy: Option<PinnedBuf>,
B
Bartek Iwańczuk 已提交
213
) -> Pin<Box<MinimalOp>> {
R
Ryan Dahl 已提交
214 215 216
  debug!("read rid={}", rid);
  let zero_copy = match zero_copy {
    None => {
B
Bartek Iwańczuk 已提交
217
      return futures::future::err(deno_error::no_buffer_specified()).boxed()
R
Ryan Dahl 已提交
218 219 220
    }
    Some(buf) => buf,
  };
221

B
Bartek Iwańczuk 已提交
222
  let fut = read(state, rid as u32, zero_copy);
223

B
Bartek Iwańczuk 已提交
224
  fut.boxed()
225 226
}

227 228 229
/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait
/// but uses an `ErrBox` error instead of `std::io:Error`
pub trait DenoAsyncWrite {
B
Bartek Iwańczuk 已提交
230 231 232 233 234 235 236 237 238 239
  fn poll_write(
    self: Pin<&mut Self>,
    cx: &mut Context,
    buf: &[u8],
  ) -> Poll<Result<usize, ErrBox>>;

  fn poll_close(
    self: Pin<&mut Self>,
    cx: &mut Context,
  ) -> Poll<Result<(), ErrBox>>;
240 241 242
}

impl DenoAsyncWrite for StreamResource {
B
Bartek Iwańczuk 已提交
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
  fn poll_write(
    self: Pin<&mut Self>,
    cx: &mut Context,
    buf: &[u8],
  ) -> Poll<Result<usize, ErrBox>> {
    let inner = self.get_mut();
    let mut f: Box<dyn AsyncWrite + Unpin> = match inner {
      StreamResource::FsFile(f) => Box::new(AsyncWrite01CompatExt::compat(f)),
      StreamResource::Stdout(f) => Box::new(AsyncWrite01CompatExt::compat(f)),
      StreamResource::Stderr(f) => Box::new(AsyncWrite01CompatExt::compat(f)),
      StreamResource::TcpStream(f) => {
        Box::new(AsyncWrite01CompatExt::compat(f))
      }
      StreamResource::ClientTlsStream(f) => {
        Box::new(AsyncWrite01CompatExt::compat(f))
      }
      StreamResource::ServerTlsStream(f) => {
        Box::new(AsyncWrite01CompatExt::compat(f))
      }
      StreamResource::ChildStdin(f) => {
        Box::new(AsyncWrite01CompatExt::compat(f))
      }
265
      _ => {
B
Bartek Iwańczuk 已提交
266
        return Poll::Ready(Err(bad_resource()));
267 268 269
      }
    };

B
Bartek Iwańczuk 已提交
270 271 272 273 274 275 276
    let r = AsyncWrite::poll_write(Pin::new(&mut f), cx, buf);

    match r {
      Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))),
      Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
      Poll::Pending => Poll::Pending,
    }
277 278
  }

B
Bartek Iwańczuk 已提交
279 280 281 282
  fn poll_close(
    self: Pin<&mut Self>,
    _cx: &mut Context,
  ) -> Poll<Result<(), ErrBox>> {
283 284 285 286
    unimplemented!()
  }
}

287 288 289 290
/// A future used to write some data to a stream.
pub struct Write<T> {
  rid: ResourceId,
  buf: T,
291 292
  io_state: IoState,
  state: ThreadSafeState,
293 294 295 296 297 298 299
}

/// Creates a future that will write some of the buffer `buf` to
/// the stream resource with `rid`.
///
/// Any error which happens during writing will cause both the stream and the
/// buffer to get destroyed.
300
pub fn write<T>(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Write<T>
301 302 303 304 305 306
where
  T: AsRef<[u8]>,
{
  Write {
    rid,
    buf,
307 308
    io_state: IoState::Pending,
    state: state.clone(),
309 310 311 312 313 314 315
  }
}

/// This is almost the same implementation as in tokio, difference is
/// that error type is `ErrBox` instead of `std::io::Error`.
impl<T> Future for Write<T>
where
B
Bartek Iwańczuk 已提交
316
  T: AsRef<[u8]> + Unpin,
317
{
B
Bartek Iwańczuk 已提交
318
  type Output = Result<i32, ErrBox>;
319

B
Bartek Iwańczuk 已提交
320 321 322
  fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
    let inner = self.get_mut();
    if inner.io_state == IoState::Done {
323 324 325
      panic!("poll a Read after it's done");
    }

B
Bartek Iwańczuk 已提交
326
    let mut table = inner.state.lock_resource_table();
327
    let resource = table
B
Bartek Iwańczuk 已提交
328
      .get_mut::<StreamResource>(inner.rid)
329
      .ok_or_else(bad_resource)?;
B
Bartek Iwańczuk 已提交
330 331 332 333 334 335 336 337 338 339 340
    let nwritten = match DenoAsyncWrite::poll_write(
      Pin::new(resource),
      cx,
      inner.buf.as_ref(),
    ) {
      Poll::Ready(Ok(v)) => v,
      Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
      Poll::Pending => return Poll::Pending,
    };
    inner.io_state = IoState::Done;
    Poll::Ready(Ok(nwritten as i32))
R
Ryan Dahl 已提交
341 342 343
  }
}

344 345 346 347
pub fn op_write(
  state: &ThreadSafeState,
  rid: i32,
  zero_copy: Option<PinnedBuf>,
B
Bartek Iwańczuk 已提交
348
) -> Pin<Box<MinimalOp>> {
R
Ryan Dahl 已提交
349 350 351
  debug!("write rid={}", rid);
  let zero_copy = match zero_copy {
    None => {
B
Bartek Iwańczuk 已提交
352
      return futures::future::err(deno_error::no_buffer_specified()).boxed()
R
Ryan Dahl 已提交
353 354 355
    }
    Some(buf) => buf,
  };
356

B
Bartek Iwańczuk 已提交
357
  let fut = write(state, rid as u32, zero_copy);
358

B
Bartek Iwańczuk 已提交
359
  fut.boxed()
R
Ryan Dahl 已提交
360
}