dispatch_json.rs 2.9 KB
Newer Older
1 2
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use deno::*;
B
Bartek Iwańczuk 已提交
3 4
use futures::future::FutureExt;
use futures::task::SpawnExt;
5 6 7
pub use serde_derive::Deserialize;
use serde_json::json;
pub use serde_json::Value;
B
Bartek Iwańczuk 已提交
8 9
use std::future::Future;
use std::pin::Pin;
10

B
Bartek Iwańczuk 已提交
11 12
pub type AsyncJsonOp =
  Pin<Box<dyn Future<Output = Result<Value, ErrBox>> + Send>>;
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34

pub enum JsonOp {
  Sync(Value),
  Async(AsyncJsonOp),
}

fn json_err(err: ErrBox) -> Value {
  use crate::deno_error::GetErrorKind;
  json!({
    "message": err.to_string(),
    "kind": err.kind() as u32,
  })
}

fn serialize_result(
  promise_id: Option<u64>,
  result: Result<Value, ErrBox>,
) -> Buf {
  let value = match result {
    Ok(v) => json!({ "ok": v, "promiseId": promise_id }),
    Err(err) => json!({ "err": json_err(err), "promiseId": promise_id }),
  };
35 36 37 38
  let mut vec = serde_json::to_vec(&value).unwrap();
  debug!("JSON response pre-align, len={}", vec.len());
  // Align to 32bit word, padding with the space character.
  vec.resize((vec.len() + 3usize) & !3usize, b' ');
39 40 41 42 43 44 45 46 47
  vec.into_boxed_slice()
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct AsyncArgs {
  promise_id: Option<u64>,
}

48 49 50 51 52
pub fn json_op<D>(d: D) -> impl Fn(&[u8], Option<PinnedBuf>) -> CoreOp
where
  D: Fn(Value, Option<PinnedBuf>) -> Result<JsonOp, ErrBox>,
{
  move |control: &[u8], zero_copy: Option<PinnedBuf>| {
53 54 55 56 57 58 59
    let async_args: AsyncArgs = match serde_json::from_slice(control) {
      Ok(args) => args,
      Err(e) => {
        let buf = serialize_result(None, Err(ErrBox::from(e)));
        return CoreOp::Sync(buf);
      }
    };
60 61
    let promise_id = async_args.promise_id;
    let is_sync = promise_id.is_none();
62

63 64 65 66 67 68 69 70 71 72 73 74
    let result = serde_json::from_slice(control)
      .map_err(ErrBox::from)
      .and_then(|args| d(args, zero_copy));

    // Convert to CoreOp
    match result {
      Ok(JsonOp::Sync(sync_value)) => {
        assert!(promise_id.is_none());
        CoreOp::Sync(serialize_result(promise_id, Ok(sync_value)))
      }
      Ok(JsonOp::Async(fut)) => {
        assert!(promise_id.is_some());
B
Bartek Iwańczuk 已提交
75 76 77 78
        let fut2 = fut.then(move |result| {
          futures::future::ok(serialize_result(promise_id, result))
        });
        CoreOp::Async(fut2.boxed())
79 80 81 82 83 84
      }
      Err(sync_err) => {
        let buf = serialize_result(promise_id, Err(sync_err));
        if is_sync {
          CoreOp::Sync(buf)
        } else {
B
Bartek Iwańczuk 已提交
85
          CoreOp::Async(futures::future::ok(buf).boxed())
86
        }
87 88 89 90 91 92 93
      }
    }
  }
}

pub fn blocking_json<F>(is_sync: bool, f: F) -> Result<JsonOp, ErrBox>
where
B
Bartek Iwańczuk 已提交
94
  F: 'static + Send + FnOnce() -> Result<Value, ErrBox> + Unpin,
95 96 97 98
{
  if is_sync {
    Ok(JsonOp::Sync(f()?))
  } else {
B
Bartek Iwańczuk 已提交
99 100 101 102 103 104
    //TODO(afinch7) replace this with something more efficent.
    let pool = futures::executor::ThreadPool::new().unwrap();
    let handle = pool
      .spawn_with_handle(futures::future::lazy(move |_cx| f()))
      .unwrap();
    Ok(JsonOp::Async(handle.boxed()))
105 106
  }
}