From a787f4001388a394d5219b74113a718d980e4c90 Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Tue, 3 Jul 2012 17:33:20 -0700 Subject: [PATCH] Select on pipes. Updating syntax and test cases. --- src/libcore/pipes.rs | 142 +++++++++++++++--- src/libcore/vec.rs | 43 +++++- src/test/bench/msgsend-ring-contracts.rs | 2 +- src/test/run-pass/pipe-manual-2.rs | 178 ----------------------- src/test/run-pass/pipe-manual-3.rs | 178 ----------------------- src/test/run-pass/pipe-select.rs | 125 ++++++++++++++++ 6 files changed, 284 insertions(+), 384 deletions(-) create mode 100644 src/test/run-pass/pipe-select.rs diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index 88f69555439..f1617c59ac7 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -9,23 +9,29 @@ enum state { terminated } -type packet = { +type packet_header = { mut state: state, mut blocked_task: option<*rust_task>, +}; + +type packet = { + header: packet_header, mut payload: option }; fn packet() -> *packet unsafe { let p: *packet = unsafe::transmute(~{ - mut state: empty, - mut blocked_task: none::, + header: { + mut state: empty, + mut blocked_task: none::, + }, mut payload: none:: }); p } #[abi = "rust-intrinsic"] -native mod rusti { +extern mod rusti { fn atomic_xchng(&dst: int, src: int) -> int; fn atomic_xchng_acq(&dst: int, src: int) -> int; fn atomic_xchng_rel(&dst: int, src: int) -> int; @@ -33,7 +39,7 @@ fn packet() -> *packet unsafe { type rust_task = libc::c_void; -native mod rustrt { +extern mod rustrt { #[rust_stack] fn rust_get_task() -> *rust_task; @@ -71,7 +77,7 @@ fn send(-p: send_packet, -payload: T) { let p = unsafe { uniquify(p_) }; assert (*p).payload == none; (*p).payload <- some(payload); - let old_state = swap_state_rel((*p).state, full); + let old_state = swap_state_rel(p.header.state, full); alt old_state { empty { // Yay, fastpath. @@ -82,9 +88,10 @@ fn send(-p: send_packet, -payload: T) { full { fail "duplicate send" } blocked { #debug("waking up task for %?", p_); - alt p.blocked_task { + alt p.header.blocked_task { some(task) { - rustrt::task_signal_event(task, p_ as *libc::c_void); + rustrt::task_signal_event( + task, ptr::addr_of(p.header) as *libc::c_void); } none { fail "blocked packet has no task" } } @@ -104,20 +111,20 @@ fn recv(-p: recv_packet) -> option { let p = unsafe { uniquify(p_) }; let this = rustrt::rust_get_task(); rustrt::task_clear_event_reject(this); - p.blocked_task = some(this); + p.header.blocked_task = some(this); loop { - let old_state = swap_state_acq((*p).state, + let old_state = swap_state_acq(p.header.state, blocked); #debug("%?", old_state); alt old_state { empty { #debug("no data available on %?, going to sleep.", p_); rustrt::task_wait_event(this); - #debug("woke up, p.state = %?", p.state); - if p.state == full { + #debug("woke up, p.state = %?", p.header.state); + if p.header.state == full { let mut payload = none; payload <-> (*p).payload; - p.state = terminated; + p.header.state = terminated; ret some(option::unwrap(payload)) } } @@ -125,7 +132,7 @@ fn recv(-p: recv_packet) -> option { full { let mut payload = none; payload <-> (*p).payload; - p.state = terminated; + p.header.state = terminated; ret some(option::unwrap(payload)) } terminated { @@ -138,7 +145,7 @@ fn recv(-p: recv_packet) -> option { fn sender_terminate(p: *packet) { let p = unsafe { uniquify(p) }; - alt swap_state_rel((*p).state, terminated) { + alt swap_state_rel(p.header.state, terminated) { empty | blocked { // The receiver will eventually clean up. unsafe { forget(p) } @@ -155,7 +162,7 @@ fn sender_terminate(p: *packet) { fn receiver_terminate(p: *packet) { let p = unsafe { uniquify(p) }; - alt swap_state_rel((*p).state, terminated) { + alt swap_state_rel(p.header.state, terminated) { empty { // the sender will clean up unsafe { forget(p) } @@ -170,15 +177,106 @@ fn receiver_terminate(p: *packet) { } } +impl private_methods for packet_header { + // Returns the old state. + fn mark_blocked(this: *rust_task) -> state { + self.blocked_task = some(this); + swap_state_acq(self.state, blocked) + } + + fn unblock() { + alt swap_state_acq(self.state, empty) { + empty | blocked { } + terminated { self.state = terminated; } + full { self.state = full; } + } + } +} + +#[doc = "Returns when one of the packet headers reports data is +available."] +fn wait_many(pkts: ~[&a.packet_header]) -> uint { + let this = rustrt::rust_get_task(); + + rustrt::task_clear_event_reject(this); + let mut data_avail = false; + let mut ready_packet = pkts.len(); + for pkts.eachi |i, p| { + let old = p.mark_blocked(this); + alt old { + full | terminated { + data_avail = true; + ready_packet = i; + p.state = old; + break; + } + blocked { fail "blocking on blocked packet" } + empty { } + } + } + + while !data_avail { + #debug("sleeping on %? packets", pkts.len()); + let event = rustrt::task_wait_event(this) as *packet_header; + let pos = vec::position(pkts, |p| ptr::addr_of(*p) == event); + + alt pos { + some(i) { + ready_packet = i; + data_avail = true; + } + none { + #debug("ignoring spurious event, %?", event); + } + } + } + + #debug("%?", pkts[ready_packet]); + + for pkts.each |p| { p.unblock() } + + #debug("%?, %?", ready_packet, pkts[ready_packet]); + + assert pkts[ready_packet].state == full + || pkts[ready_packet].state == terminated; + + ready_packet +} + +#[doc = "Waits on a set of endpoints. Returns a message, its index, + and a list of the remaining endpoints."] +fn select(+endpoints: ~[recv_packet]) + -> (uint, option, ~[recv_packet]) +{ + let endpoints = vec::map_consume( + endpoints, + |p| unsafe { uniquify(p.unwrap()) }); + let endpoints_r = vec::view(endpoints, 0, endpoints.len()); + let ready = wait_many(endpoints_r.map_r(|p| &p.header)); + let mut remaining = ~[]; + let mut result = none; + do vec::consume(endpoints) |i, p| { + let p = recv_packet(unsafe { unsafe::transmute(p) }); + if i == ready { + result = recv(p); + } + else { + vec::push(remaining, p); + } + } + + (ready, result, remaining) +} + class send_packet { let mut p: option<*packet>; new(p: *packet) { - //#error("take send %?", p); + //#debug("take send %?", p); self.p = some(p); } drop { //if self.p != none { - // #error("drop send %?", option::get(self.p)); + // #debug("drop send %?", option::get(self.p)); //} if self.p != none { let mut p = none; @@ -196,12 +294,12 @@ fn unwrap() -> *packet { class recv_packet { let mut p: option<*packet>; new(p: *packet) { - //#error("take recv %?", p); + //#debug("take recv %?", p); self.p = some(p); } drop { //if self.p != none { - // #error("drop recv %?", option::get(self.p)); + // #debug("drop recv %?", option::get(self.p)); //} if self.p != none { let mut p = none; @@ -222,7 +320,7 @@ fn entangle() -> (send_packet, recv_packet) { } fn spawn_service( - init: native fn() -> (send_packet, recv_packet), + init: extern fn() -> (send_packet, recv_packet), +service: fn~(+recv_packet)) -> send_packet { @@ -241,7 +339,7 @@ fn spawn_service( } fn spawn_service_recv( - init: native fn() -> (recv_packet, send_packet), + init: extern fn() -> (recv_packet, send_packet), +service: fn~(+send_packet)) -> recv_packet { diff --git a/src/libcore/vec.rs b/src/libcore/vec.rs index b3b9d089fea..eb3d52edc24 100644 --- a/src/libcore/vec.rs +++ b/src/libcore/vec.rs @@ -6,6 +6,7 @@ export append; export append_one; +export consume; export init_op; export is_empty; export is_not_empty; @@ -40,6 +41,7 @@ export map; export mapi; export map2; +export map_consume; export flat_map; export filter_map; export filter; @@ -261,8 +263,8 @@ fn from_mut(+v: ~[mut T]) -> ~[T] { ret result; } -/// Return a slice that points into another slice. -pure fn view(v: &[const T], start: uint, end: uint) -> &a.[T] { +#[doc = "Return a slice that points into another slice."] +pure fn view(v: &[const T], start: uint, end: uint) -> &a.[T] { assert (start <= end); assert (end <= len(v)); do unpack_slice(v) |p, _len| { @@ -373,7 +375,7 @@ fn rsplitn(v: &[T], n: uint, f: fn(T) -> bool) -> ~[~[T]] { /// Removes the first element from a vector and return it fn shift(&v: ~[T]) -> T { let ln = len::(v); - assert (ln > 0u); + assert (ln > 0); let mut vv = ~[]; v <-> vv; @@ -384,12 +386,12 @@ fn shift(&v: ~[T]) -> T { let vv = unsafe::to_ptr(vv); rr <- *vv; - for uint::range(1u, ln) |i| { + for uint::range(1, ln) |i| { let r <- *ptr::offset(vv, i); push(v, r); } } - unsafe::set_len(vv, 0u); + unsafe::set_len(vv, 0); rr } @@ -404,6 +406,17 @@ fn unshift(&v: ~[T], +x: T) { } } +fn consume(+v: ~[T], f: fn(uint, +T)) unsafe { + do unpack_slice(v) |p, ln| { + for uint::range(0, ln) |i| { + let x <- *ptr::offset(p, i); + f(i, x); + } + } + + unsafe::set_len(v, 0); +} + /// Remove the last element from a vector and return it fn pop(&v: ~[const T]) -> T { let ln = len(v); @@ -575,6 +588,14 @@ fn grow_set(&v: ~[mut T], index: uint, initval: T, val: T) { ret result; } +fn map_consume(+v: ~[T], f: fn(+T) -> U) -> ~[U] { + let mut result = ~[]; + do consume(v) |_i, x| { + vec::push(result, f(x)); + } + result +} + /// Apply a function to each element of a vector and return the results pure fn mapi(v: &[T], f: fn(uint, T) -> U) -> ~[U] { let mut result = ~[]; @@ -1277,6 +1298,18 @@ impl extensions/& for &[T] { pure fn mapi(f: fn(uint, T) -> U) -> ~[U] { mapi(self, f) } + + #[inline] + fn map_r(f: fn(x: &self.T) -> U) -> ~[U] { + let mut r = ~[]; + let mut i = 0; + while i < self.len() { + push(r, f(&self[i])); + i += 1; + } + r + } + /** * Returns true if the function returns true for all elements. * diff --git a/src/test/bench/msgsend-ring-contracts.rs b/src/test/bench/msgsend-ring-contracts.rs index 99265353fe5..9ad3025d33a 100644 --- a/src/test/bench/msgsend-ring-contracts.rs +++ b/src/test/bench/msgsend-ring-contracts.rs @@ -119,7 +119,7 @@ fn main(args: [str]/~) { thread_ring(0u, msg_per_task, option::unwrap(num_chan), num_port); // synchronize - for futures.each |f| { f.get() }; + for futures.each |f| { future::get(f) }; let stop = time::precise_time_s(); diff --git a/src/test/run-pass/pipe-manual-2.rs b/src/test/run-pass/pipe-manual-2.rs index 0619a7b6b44..0803bbfe531 100644 --- a/src/test/run-pass/pipe-manual-2.rs +++ b/src/test/run-pass/pipe-manual-2.rs @@ -13,184 +13,6 @@ */ -// Hopefully someday we'll move this into core. -mod pipes { - import unsafe::{forget, reinterpret_cast}; - - enum state { - empty, - full, - blocked, - terminated - } - - type packet = { - mut state: state, - mut blocked_task: option, - mut payload: option - }; - - fn packet() -> *packet unsafe { - let p: *packet = unsafe::transmute(~{ - mut state: empty, - mut blocked_task: none::, - mut payload: none:: - }); - p - } - - #[abi = "rust-intrinsic"] - native mod rusti { - fn atomic_xchng(&dst: int, src: int) -> int; - fn atomic_xchng_acq(&dst: int, src: int) -> int; - fn atomic_xchng_rel(&dst: int, src: int) -> int; - } - - // We should consider moving this to core::unsafe, although I - // suspect graydon would want us to use void pointers instead. - unsafe fn uniquify(x: *T) -> ~T { - unsafe { unsafe::reinterpret_cast(x) } - } - - fn swap_state_acq(&dst: state, src: state) -> state { - unsafe { - reinterpret_cast(rusti::atomic_xchng_acq( - *(ptr::mut_addr_of(dst) as *mut int), - src as int)) - } - } - - fn swap_state_rel(&dst: state, src: state) -> state { - unsafe { - reinterpret_cast(rusti::atomic_xchng_rel( - *(ptr::mut_addr_of(dst) as *mut int), - src as int)) - } - } - - fn send(-p: send_packet, -payload: T) { - let p = p.unwrap(); - let p = unsafe { uniquify(p) }; - assert (*p).payload == none; - (*p).payload <- some(payload); - let old_state = swap_state_rel((*p).state, full); - alt old_state { - empty { - // Yay, fastpath. - - // The receiver will eventually clean this up. - unsafe { forget(p); } - } - full { fail "duplicate send" } - blocked { - // FIXME: once the target will actually block, tell the - // scheduler to wake it up. - - // The receiver will eventually clean this up. - unsafe { forget(p); } - } - terminated { - // The receiver will never receive this. Rely on drop_glue - // to clean everything up. - } - } - } - - fn recv(-p: recv_packet) -> option { - let p = p.unwrap(); - let p = unsafe { uniquify(p) }; - loop { - let old_state = swap_state_acq((*p).state, - blocked); - alt old_state { - empty | blocked { task::yield(); } - full { - let mut payload = none; - payload <-> (*p).payload; - ret some(option::unwrap(payload)) - } - terminated { - assert old_state == terminated; - ret none; - } - } - } - } - - fn sender_terminate(p: *packet) { - let p = unsafe { uniquify(p) }; - alt swap_state_rel((*p).state, terminated) { - empty | blocked { - // The receiver will eventually clean up. - unsafe { forget(p) } - } - full { - // This is impossible - fail "you dun goofed" - } - terminated { - // I have to clean up, use drop_glue - } - } - } - - fn receiver_terminate(p: *packet) { - let p = unsafe { uniquify(p) }; - alt swap_state_rel((*p).state, terminated) { - empty { - // the sender will clean up - unsafe { forget(p) } - } - blocked { - // this shouldn't happen. - fail "terminating a blocked packet" - } - terminated | full { - // I have to clean up, use drop_glue - } - } - } - - class send_packet { - let mut p: option<*packet>; - new(p: *packet) { self.p = some(p); } - drop { - if self.p != none { - let mut p = none; - p <-> self.p; - sender_terminate(option::unwrap(p)) - } - } - fn unwrap() -> *packet { - let mut p = none; - p <-> self.p; - option::unwrap(p) - } - } - - class recv_packet { - let mut p: option<*packet>; - new(p: *packet) { self.p = some(p); } - drop { - if self.p != none { - let mut p = none; - p <-> self.p; - receiver_terminate(option::unwrap(p)) - } - } - fn unwrap() -> *packet { - let mut p = none; - p <-> self.p; - option::unwrap(p) - } - } - - fn entangle() -> (send_packet, recv_packet) { - let p = packet(); - (send_packet(p), recv_packet(p)) - } -} - mod pingpong { enum ping = *pipes::packet; enum pong = *pipes::packet; diff --git a/src/test/run-pass/pipe-manual-3.rs b/src/test/run-pass/pipe-manual-3.rs index 905063ea713..9f05038aadc 100644 --- a/src/test/run-pass/pipe-manual-3.rs +++ b/src/test/run-pass/pipe-manual-3.rs @@ -15,184 +15,6 @@ */ -// Hopefully someday we'll move this into core. -mod pipes { - import unsafe::{forget, reinterpret_cast}; - - enum state { - empty, - full, - blocked, - terminated - } - - type packet = { - mut state: state, - mut blocked_task: option, - mut payload: option - }; - - fn packet() -> *packet unsafe { - let p: *packet = unsafe::transmute(~{ - mut state: empty, - mut blocked_task: none::, - mut payload: none:: - }); - p - } - - #[abi = "rust-intrinsic"] - native mod rusti { - fn atomic_xchng(&dst: int, src: int) -> int; - fn atomic_xchng_acq(&dst: int, src: int) -> int; - fn atomic_xchng_rel(&dst: int, src: int) -> int; - } - - // We should consider moving this to core::unsafe, although I - // suspect graydon would want us to use void pointers instead. - unsafe fn uniquify(x: *T) -> ~T { - unsafe { unsafe::reinterpret_cast(x) } - } - - fn swap_state_acq(&dst: state, src: state) -> state { - unsafe { - reinterpret_cast(rusti::atomic_xchng_acq( - *(ptr::mut_addr_of(dst) as *mut int), - src as int)) - } - } - - fn swap_state_rel(&dst: state, src: state) -> state { - unsafe { - reinterpret_cast(rusti::atomic_xchng_rel( - *(ptr::mut_addr_of(dst) as *mut int), - src as int)) - } - } - - fn send(-p: send_packet, -payload: T) { - let p = p.unwrap(); - let p = unsafe { uniquify(p) }; - assert (*p).payload == none; - (*p).payload <- some(payload); - let old_state = swap_state_rel((*p).state, full); - alt old_state { - empty { - // Yay, fastpath. - - // The receiver will eventually clean this up. - unsafe { forget(p); } - } - full { fail "duplicate send" } - blocked { - // FIXME: once the target will actually block, tell the - // scheduler to wake it up. - - // The receiver will eventually clean this up. - unsafe { forget(p); } - } - terminated { - // The receiver will never receive this. Rely on drop_glue - // to clean everything up. - } - } - } - - fn recv(-p: recv_packet) -> option { - let p = p.unwrap(); - let p = unsafe { uniquify(p) }; - loop { - let old_state = swap_state_acq((*p).state, - blocked); - alt old_state { - empty | blocked { task::yield(); } - full { - let mut payload = none; - payload <-> (*p).payload; - ret some(option::unwrap(payload)) - } - terminated { - assert old_state == terminated; - ret none; - } - } - } - } - - fn sender_terminate(p: *packet) { - let p = unsafe { uniquify(p) }; - alt swap_state_rel((*p).state, terminated) { - empty | blocked { - // The receiver will eventually clean up. - unsafe { forget(p) } - } - full { - // This is impossible - fail "you dun goofed" - } - terminated { - // I have to clean up, use drop_glue - } - } - } - - fn receiver_terminate(p: *packet) { - let p = unsafe { uniquify(p) }; - alt swap_state_rel((*p).state, terminated) { - empty { - // the sender will clean up - unsafe { forget(p) } - } - blocked { - // this shouldn't happen. - fail "terminating a blocked packet" - } - terminated | full { - // I have to clean up, use drop_glue - } - } - } - - class send_packet { - let mut p: option<*packet>; - new(p: *packet) { self.p = some(p); } - drop { - if self.p != none { - let mut p = none; - p <-> self.p; - sender_terminate(option::unwrap(p)) - } - } - fn unwrap() -> *packet { - let mut p = none; - p <-> self.p; - option::unwrap(p) - } - } - - class recv_packet { - let mut p: option<*packet>; - new(p: *packet) { self.p = some(p); } - drop { - if self.p != none { - let mut p = none; - p <-> self.p; - receiver_terminate(option::unwrap(p)) - } - } - fn unwrap() -> *packet { - let mut p = none; - p <-> self.p; - option::unwrap(p) - } - } - - fn entangle() -> (send_packet, recv_packet) { - let p = packet(); - (send_packet(p), recv_packet(p)) - } -} - mod pingpong { enum ping { ping, } enum ping_message = *pipes::packet; diff --git a/src/test/run-pass/pipe-select.rs b/src/test/run-pass/pipe-select.rs new file mode 100644 index 00000000000..163cfa95488 --- /dev/null +++ b/src/test/run-pass/pipe-select.rs @@ -0,0 +1,125 @@ +use std; +import std::timer::sleep; +import std::uv; + +import pipes::{recv, select}; + +// Compiled by pipec +mod oneshot { + fn init() -> (client::waiting, server::waiting) { pipes::entangle() } + enum waiting { signal(server::signaled), } + enum signaled { } + mod client { + fn signal(-pipe: waiting) -> signaled { + let (c, s) = pipes::entangle(); + let message = oneshot::signal(s); + pipes::send(pipe, message); + c + } + type waiting = pipes::send_packet; + type signaled = pipes::send_packet; + } + mod server { + impl recv for waiting { + fn recv() -> extern fn(-waiting) -> oneshot::waiting { + fn recv(-pipe: waiting) -> oneshot::waiting { + option::unwrap(pipes::recv(pipe)) + } + recv + } + } + type waiting = pipes::recv_packet; + impl recv for signaled { + fn recv() -> extern fn(-signaled) -> oneshot::signaled { + fn recv(-pipe: signaled) -> oneshot::signaled { + option::unwrap(pipes::recv(pipe)) + } + recv + } + } + type signaled = pipes::recv_packet; + } +} + +mod stream { + fn init() -> (client::stream, server::stream) { + pipes::entangle() + } + enum stream { send(T, server::stream), } + mod client { + fn send(+pipe: stream, +x_0: T) -> stream { + { + let (c, s) = pipes::entangle(); + let message = stream::send(x_0, s); + pipes::send(pipe, message); + c + } + } + type stream = pipes::send_packet>; + } + mod server { + impl recv for stream { + fn recv() -> extern fn(+stream) -> stream::stream { + fn recv(+pipe: stream) -> stream::stream { + option::unwrap(pipes::recv(pipe)) + } + recv + } + } + type stream = pipes::recv_packet>; + } +} + +fn main() { + import oneshot::client::*; + import stream::client::*; + + let iotask = uv::global_loop::get(); + + #macro[ + [#recv[chan], + chan.recv()(chan)] + ]; + + let c = pipes::spawn_service(stream::init, |p| { + #error("waiting for pipes"); + let stream::send(x, p) = option::unwrap(recv(p)); + #error("got pipes"); + let (left, right) : (oneshot::server::waiting, + oneshot::server::waiting) + = x; + #error("selecting"); + let (i, _, _) = select(~[left, right]); + #error("selected"); + assert i == 0; + + #error("waiting for pipes"); + let stream::send(x, _) = option::unwrap(recv(p)); + #error("got pipes"); + let (left, right) : (oneshot::server::waiting, + oneshot::server::waiting) + = x; + #error("selecting"); + let (i, _, _) = select(~[left, right]); + #error("selected"); + assert i == 1; + }); + + let (c1, p1) = oneshot::init(); + let (c2, p2) = oneshot::init(); + + let c = send(c, (p1, p2)); + + sleep(iotask, 1000); + + signal(c1); + + let (c1, p1) = oneshot::init(); + let (c2, p2) = oneshot::init(); + + send(c, (p1, p2)); + + sleep(iotask, 1000); + + signal(c2); +} \ No newline at end of file -- GitLab