shared_queue.rs 9.0 KB
Newer Older
1
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
SharedQueue Binary Layout
+-------------------------------+-------------------------------+
|                        NUM_RECORDS (32)                       |
+---------------------------------------------------------------+
|                        NUM_SHIFTED_OFF (32)                   |
+---------------------------------------------------------------+
|                        HEAD (32)                              |
+---------------------------------------------------------------+
|                        OFFSETS (32)                           |
+---------------------------------------------------------------+
|                        RECORD_ENDS (*MAX_RECORDS)           ...
+---------------------------------------------------------------+
|                        RECORDS (*MAX_RECORDS)               ...
+---------------------------------------------------------------+
 */

B
Bert Belder 已提交
19
use crate::bindings;
20
use crate::ops::OpId;
B
Bert Belder 已提交
21
use rusty_v8 as v8;
R
Ryan Dahl 已提交
22 23 24 25 26 27 28 29 30 31

const MAX_RECORDS: usize = 100;
/// Total number of records added.
const INDEX_NUM_RECORDS: usize = 0;
/// Number of records that have been shifted off.
const INDEX_NUM_SHIFTED_OFF: usize = 1;
/// The head is the number of initialized bytes in SharedQueue.
/// It grows monotonically.
const INDEX_HEAD: usize = 2;
const INDEX_OFFSETS: usize = 3;
R
Ryan Dahl 已提交
32
const INDEX_RECORDS: usize = INDEX_OFFSETS + 2 * MAX_RECORDS;
R
Ryan Dahl 已提交
33 34 35 36 37 38
/// Byte offset of where the records begin. Also where the head starts.
const HEAD_INIT: usize = 4 * INDEX_RECORDS;
/// A rough guess at how big we should make the shared buffer in bytes.
pub const RECOMMENDED_SIZE: usize = 128 * MAX_RECORDS;

pub struct SharedQueue {
B
Bert Belder 已提交
39
  buf: v8::SharedRef<v8::BackingStore>,
R
Ryan Dahl 已提交
40 41 42 43
}

impl SharedQueue {
  pub fn new(len: usize) -> Self {
B
Bert Belder 已提交
44 45 46
    let mut buf = Vec::new();
    buf.resize(HEAD_INIT + len, 0);
    let buf = buf.into_boxed_slice();
R
Ryan Dahl 已提交
47
    let buf = v8::SharedArrayBuffer::new_backing_store_from_boxed_slice(buf);
R
Ryan Dahl 已提交
48 49 50
    let mut q = Self {
      buf: buf.make_shared(),
    };
R
Ryan Dahl 已提交
51 52 53 54
    q.reset();
    q
  }

B
Bert Belder 已提交
55 56 57 58 59
  pub fn get_backing_store(&mut self) -> &mut v8::SharedRef<v8::BackingStore> {
    &mut self.buf
  }

  pub fn bytes(&self) -> &[u8] {
B
Bert Belder 已提交
60 61 62
    unsafe {
      bindings::get_backing_store_slice(&self.buf, 0, self.buf.byte_length())
    }
B
Bert Belder 已提交
63 64 65
  }

  pub fn bytes_mut(&mut self) -> &mut [u8] {
B
Bert Belder 已提交
66 67 68 69 70 71 72
    unsafe {
      bindings::get_backing_store_slice_mut(
        &self.buf,
        0,
        self.buf.byte_length(),
      )
    }
B
Bert Belder 已提交
73 74
  }

75
  fn reset(&mut self) {
76
    debug!("rust:shared_queue:reset");
R
Ryan Dahl 已提交
77
    let s: &mut [u32] = self.as_u32_slice_mut();
78 79
    s[INDEX_NUM_RECORDS] = 0;
    s[INDEX_NUM_SHIFTED_OFF] = 0;
R
Ryan Dahl 已提交
80 81 82
    s[INDEX_HEAD] = HEAD_INIT as u32;
  }

83
  fn as_u32_slice(&self) -> &[u32] {
B
Bert Belder 已提交
84
    let p = self.bytes().as_ptr();
85 86 87 88
    // Assert pointer is 32 bit aligned before casting.
    assert_eq!((p as usize) % std::mem::align_of::<u32>(), 0);
    #[allow(clippy::cast_ptr_alignment)]
    let p32 = p as *const u32;
B
Bert Belder 已提交
89
    unsafe { std::slice::from_raw_parts(p32, self.bytes().len() / 4) }
R
Ryan Dahl 已提交
90 91
  }

92
  fn as_u32_slice_mut(&mut self) -> &mut [u32] {
B
Bert Belder 已提交
93
    let p = self.bytes_mut().as_mut_ptr();
94 95 96 97
    // Assert pointer is 32 bit aligned before casting.
    assert_eq!((p as usize) % std::mem::align_of::<u32>(), 0);
    #[allow(clippy::cast_ptr_alignment)]
    let p32 = p as *mut u32;
B
Bert Belder 已提交
98
    unsafe { std::slice::from_raw_parts_mut(p32, self.bytes().len() / 4) }
R
Ryan Dahl 已提交
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
  }

  pub fn size(&self) -> usize {
    let s = self.as_u32_slice();
    (s[INDEX_NUM_RECORDS] - s[INDEX_NUM_SHIFTED_OFF]) as usize
  }

  fn num_records(&self) -> usize {
    let s = self.as_u32_slice();
    s[INDEX_NUM_RECORDS] as usize
  }

  fn head(&self) -> usize {
    let s = self.as_u32_slice();
    s[INDEX_HEAD] as usize
  }

116 117
  fn num_shifted_off(&self) -> usize {
    let s = self.as_u32_slice();
B
Bert Belder 已提交
118
    s[INDEX_NUM_SHIFTED_OFF] as usize
119 120
  }

R
Ryan Dahl 已提交
121
  fn set_meta(&mut self, index: usize, end: usize, op_id: OpId) {
R
Ryan Dahl 已提交
122
    let s = self.as_u32_slice_mut();
R
Ryan Dahl 已提交
123 124
    s[INDEX_OFFSETS + 2 * index] = end as u32;
    s[INDEX_OFFSETS + 2 * index + 1] = op_id;
R
Ryan Dahl 已提交
125 126
  }

R
Ryan Dahl 已提交
127
  #[cfg(test)]
R
Ryan Dahl 已提交
128
  fn get_meta(&self, index: usize) -> Option<(OpId, usize)> {
R
Ryan Dahl 已提交
129 130
    if index < self.num_records() {
      let s = self.as_u32_slice();
R
Ryan Dahl 已提交
131 132 133
      let end = s[INDEX_OFFSETS + 2 * index] as usize;
      let op_id = s[INDEX_OFFSETS + 2 * index + 1];
      Some((op_id, end))
R
Ryan Dahl 已提交
134 135 136 137 138
    } else {
      None
    }
  }

R
Ryan Dahl 已提交
139
  #[cfg(test)]
R
Ryan Dahl 已提交
140 141 142 143 144 145
  fn get_offset(&self, index: usize) -> Option<usize> {
    if index < self.num_records() {
      Some(if index == 0 {
        HEAD_INIT
      } else {
        let s = self.as_u32_slice();
146 147
        let prev_end = s[INDEX_OFFSETS + 2 * (index - 1)] as usize;
        (prev_end + 3) & !3
R
Ryan Dahl 已提交
148 149 150 151 152 153 154
      })
    } else {
      None
    }
  }

  /// Returns none if empty.
R
Ryan Dahl 已提交
155
  #[cfg(test)]
R
Ryan Dahl 已提交
156
  pub fn shift(&mut self) -> Option<(OpId, &[u8])> {
R
Ryan Dahl 已提交
157 158
    let u32_slice = self.as_u32_slice();
    let i = u32_slice[INDEX_NUM_SHIFTED_OFF] as usize;
159 160
    if self.size() == 0 {
      assert_eq!(i, 0);
R
Ryan Dahl 已提交
161 162
      return None;
    }
163

R
Ryan Dahl 已提交
164
    let off = self.get_offset(i).unwrap();
R
Ryan Dahl 已提交
165
    let (op_id, end) = self.get_meta(i).unwrap();
166 167 168 169 170 171
    if self.size() > 1 {
      let u32_slice = self.as_u32_slice_mut();
      u32_slice[INDEX_NUM_SHIFTED_OFF] += 1;
    } else {
      self.reset();
    }
R
Ryan Dahl 已提交
172
    println!(
173 174 175 176 177
      "rust:shared_queue:shift: num_records={}, num_shifted_off={}, head={}",
      self.num_records(),
      self.num_shifted_off(),
      self.head()
    );
B
Bert Belder 已提交
178
    Some((op_id, &self.bytes()[off..end]))
R
Ryan Dahl 已提交
179 180
  }

181 182
  /// Because JS-side may cast popped message to Int32Array it is required
  /// that every message is aligned to 4-bytes.
R
Ryan Dahl 已提交
183
  pub fn push(&mut self, op_id: OpId, record: &[u8]) -> bool {
R
Ryan Dahl 已提交
184
    let off = self.head();
185
    assert_eq!(off % 4, 0);
186
    let end = off + record.len();
187
    let aligned_end = (end + 3) & !3;
188
    debug!(
189
      "rust:shared_queue:pre-push: op={}, off={}, end={}, len={}, aligned_end={}",
190 191 192
      op_id,
      off,
      end,
193 194
      record.len(),
      aligned_end,
195
    );
R
Ryan Dahl 已提交
196
    let index = self.num_records();
197
    if aligned_end > self.bytes().len() || index >= MAX_RECORDS {
198
      debug!("WARNING the sharedQueue overflowed");
R
Ryan Dahl 已提交
199 200
      return false;
    }
201
    assert_eq!(aligned_end % 4, 0);
R
Ryan Dahl 已提交
202
    self.set_meta(index, end, op_id);
203
    assert_eq!(end - off, record.len());
B
Bert Belder 已提交
204
    self.bytes_mut()[off..end].copy_from_slice(record);
R
Ryan Dahl 已提交
205 206
    let u32_slice = self.as_u32_slice_mut();
    u32_slice[INDEX_NUM_RECORDS] += 1;
207
    u32_slice[INDEX_HEAD] = aligned_end as u32;
208 209 210 211 212 213
    debug!(
      "rust:shared_queue:push: num_records={}, num_shifted_off={}, head={}",
      self.num_records(),
      self.num_shifted_off(),
      self.head()
    );
R
Ryan Dahl 已提交
214 215 216 217 218 219 220
    true
  }
}

#[cfg(test)]
mod tests {
  use super::*;
221
  use crate::ops::Buf;
R
Ryan Dahl 已提交
222 223 224 225 226 227 228 229

  #[test]
  fn basic() {
    let mut q = SharedQueue::new(RECOMMENDED_SIZE);

    let h = q.head();
    assert!(h > 0);

230
    let r = vec![1u8, 2, 3, 4].into_boxed_slice();
231
    let len = r.len() + h;
R
Ryan Dahl 已提交
232
    assert!(q.push(0, &r));
R
Ryan Dahl 已提交
233 234
    assert_eq!(q.head(), len);

235
    let r = vec![5, 6, 7, 8].into_boxed_slice();
R
Ryan Dahl 已提交
236
    assert!(q.push(0, &r));
R
Ryan Dahl 已提交
237

238
    let r = vec![9, 10, 11, 12].into_boxed_slice();
R
Ryan Dahl 已提交
239
    assert!(q.push(0, &r));
R
Ryan Dahl 已提交
240 241 242
    assert_eq!(q.num_records(), 3);
    assert_eq!(q.size(), 3);

R
Ryan Dahl 已提交
243
    let (_op_id, r) = q.shift().unwrap();
244
    assert_eq!(r, vec![1, 2, 3, 4].as_slice());
R
Ryan Dahl 已提交
245 246 247
    assert_eq!(q.num_records(), 3);
    assert_eq!(q.size(), 2);

R
Ryan Dahl 已提交
248
    let (_op_id, r) = q.shift().unwrap();
249
    assert_eq!(r, vec![5, 6, 7, 8].as_slice());
R
Ryan Dahl 已提交
250 251 252
    assert_eq!(q.num_records(), 3);
    assert_eq!(q.size(), 1);

R
Ryan Dahl 已提交
253
    let (_op_id, r) = q.shift().unwrap();
254
    assert_eq!(r, vec![9, 10, 11, 12].as_slice());
255
    assert_eq!(q.num_records(), 0);
R
Ryan Dahl 已提交
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
    assert_eq!(q.size(), 0);

    assert!(q.shift().is_none());
    assert!(q.shift().is_none());

    assert_eq!(q.num_records(), 0);
    assert_eq!(q.size(), 0);
  }

  fn alloc_buf(byte_length: usize) -> Buf {
    let mut v = Vec::new();
    v.resize(byte_length, 0);
    v.into_boxed_slice()
  }

  #[test]
  fn overflow() {
    let mut q = SharedQueue::new(RECOMMENDED_SIZE);
274
    assert!(q.push(0, &alloc_buf(RECOMMENDED_SIZE - 5)));
R
Ryan Dahl 已提交
275
    assert_eq!(q.size(), 1);
276
    assert!(!q.push(0, &alloc_buf(6)));
R
Ryan Dahl 已提交
277
    assert_eq!(q.size(), 1);
278
    assert!(q.push(0, &alloc_buf(1)));
R
Ryan Dahl 已提交
279 280
    assert_eq!(q.size(), 2);

R
Ryan Dahl 已提交
281
    let (_op_id, buf) = q.shift().unwrap();
282
    assert_eq!(buf.len(), RECOMMENDED_SIZE - 5);
R
Ryan Dahl 已提交
283 284
    assert_eq!(q.size(), 1);

285
    assert!(!q.push(0, &alloc_buf(1)));
286

R
Ryan Dahl 已提交
287
    let (_op_id, buf) = q.shift().unwrap();
288
    assert_eq!(buf.len(), 1);
289
    assert_eq!(q.size(), 0);
R
Ryan Dahl 已提交
290
  }
291 292 293 294 295

  #[test]
  fn full_records() {
    let mut q = SharedQueue::new(RECOMMENDED_SIZE);
    for _ in 0..MAX_RECORDS {
296
      assert!(q.push(0, &alloc_buf(1)))
297
    }
298
    assert_eq!(q.push(0, &alloc_buf(1)), false);
299
    // Even if we shift one off, we still cannot push a new record.
R
Ryan Dahl 已提交
300
    let _ignored = q.shift().unwrap();
301
    assert_eq!(q.push(0, &alloc_buf(1)), false);
302 303 304
  }

  #[test]
305
  fn allow_any_buf_length() {
306
    let mut q = SharedQueue::new(RECOMMENDED_SIZE);
307 308 309 310 311 312 313 314
    // Check that `record` that has length not a multiple of 4 will
    // not cause panic. Still make sure that records are always
    // aligned to 4 bytes.
    for i in 1..9 {
      q.push(0, &alloc_buf(i));
      assert_eq!(q.num_records(), i);
      assert_eq!(q.head() % 4, 0);
    }
315
  }
R
Ryan Dahl 已提交
316
}