提交 6a44482b 编写于 作者: P Patrick Walton

libcore: Remove mutable fields from pipes

上级 803a4f45
......@@ -12,6 +12,7 @@
Message passing
*/
use cast::transmute;
use cast;
use either::{Either, Left, Right};
use kinds::Owned;
......@@ -192,9 +193,9 @@ impl<T: Owned> Peekable<T> for Port<T> {
fn peek(&self) -> bool {
let mut endp = None;
endp <-> self.endp;
let peek = match &endp {
&Some(ref endp) => peek(endp),
&None => fail!(~"peeking empty stream")
let peek = match endp {
Some(ref mut endp) => peek(endp),
None => fail!(~"peeking empty stream")
};
self.endp <-> endp;
peek
......@@ -202,10 +203,10 @@ fn peek(&self) -> bool {
}
impl<T: Owned> Selectable for Port<T> {
fn header(&self) -> *PacketHeader {
fn header(&mut self) -> *mut PacketHeader {
unsafe {
match self.endp {
Some(ref endp) => endp.header(),
Some(ref mut endp) => endp.header(),
None => fail!(~"peeking empty stream")
}
}
......@@ -327,23 +328,20 @@ fn clone(&self) -> SharedChan<T> {
#[allow(non_camel_case_types)]
pub mod oneshot {
priv use core::kinds::Owned;
use ptr::to_unsafe_ptr;
use ptr::to_mut_unsafe_ptr;
pub fn init<T: Owned>() -> (client::Oneshot<T>, server::Oneshot<T>) {
pub use core::pipes::HasBuffer;
let buffer =
~::core::pipes::Buffer{
let mut buffer = ~::core::pipes::Buffer {
header: ::core::pipes::BufferHeader(),
data: __Buffer{
data: __Buffer {
Oneshot: ::core::pipes::mk_packet::<Oneshot<T>>()
},
};
do ::core::pipes::entangle_buffer(buffer) |buffer, data| {
{
data.Oneshot.set_buffer(buffer);
to_unsafe_ptr(&data.Oneshot)
}
data.Oneshot.set_buffer(buffer);
to_mut_unsafe_ptr(&mut data.Oneshot)
}
}
#[allow(non_camel_case_types)]
......@@ -497,48 +495,66 @@ pub fn try_send_one<T: Owned>(chan: ChanOne<T>, data: T) -> bool {
/// Returns the index of an endpoint that is ready to receive.
pub fn selecti<T: Selectable>(endpoints: &[T]) -> uint {
pub fn selecti<T: Selectable>(endpoints: &mut [T]) -> uint {
wait_many(endpoints)
}
/// Returns 0 or 1 depending on which endpoint is ready to receive
pub fn select2i<A: Selectable, B: Selectable>(a: &A, b: &B) ->
Either<(), ()> {
match wait_many([a.header(), b.header()]) {
0 => Left(()),
1 => Right(()),
_ => fail!(~"wait returned unexpected index")
pub fn select2i<A:Selectable, B:Selectable>(a: &mut A, b: &mut B)
-> Either<(), ()> {
let mut endpoints = [ a.header(), b.header() ];
match wait_many(endpoints) {
0 => Left(()),
1 => Right(()),
_ => fail!(~"wait returned unexpected index"),
}
}
/// Receive a message from one of two endpoints.
pub trait Select2<T: Owned, U: Owned> {
/// Receive a message or return `None` if a connection closes.
fn try_select(&self) -> Either<Option<T>, Option<U>>;
fn try_select(&mut self) -> Either<Option<T>, Option<U>>;
/// Receive a message or fail if a connection closes.
fn select(&self) -> Either<T, U>;
fn select(&mut self) -> Either<T, U>;
}
impl<T: Owned, U: Owned,
Left: Selectable + GenericPort<T>,
Right: Selectable + GenericPort<U>>
Select2<T, U> for (Left, Right) {
fn select(&self) -> Either<T, U> {
match *self {
(ref lp, ref rp) => match select2i(lp, rp) {
Left(()) => Left (lp.recv()),
Right(()) => Right(rp.recv())
}
impl<T:Owned,
U:Owned,
Left:Selectable + GenericPort<T>,
Right:Selectable + GenericPort<U>>
Select2<T, U>
for (Left, Right) {
fn select(&mut self) -> Either<T, U> {
// XXX: Bad borrow check workaround.
unsafe {
let this: &(Left, Right) = transmute(self);
match *this {
(ref lp, ref rp) => {
let lp: &mut Left = transmute(lp);
let rp: &mut Right = transmute(rp);
match select2i(lp, rp) {
Left(()) => Left(lp.recv()),
Right(()) => Right(rp.recv()),
}
}
}
}
}
fn try_select(&self) -> Either<Option<T>, Option<U>> {
match *self {
(ref lp, ref rp) => match select2i(lp, rp) {
Left(()) => Left (lp.try_recv()),
Right(()) => Right(rp.try_recv())
}
fn try_select(&mut self) -> Either<Option<T>, Option<U>> {
// XXX: Bad borrow check workaround.
unsafe {
let this: &(Left, Right) = transmute(self);
match *this {
(ref lp, ref rp) => {
let lp: &mut Left = transmute(lp);
let rp: &mut Right = transmute(rp);
match select2i(lp, rp) {
Left(()) => Left (lp.try_recv()),
Right(()) => Right(rp.try_recv()),
}
}
}
}
}
}
......
......@@ -111,7 +111,7 @@ enum State {
pub struct BufferHeader {
// Tracks whether this buffer needs to be freed. We can probably
// get away with restricting it to 0 or 1, if we're careful.
mut ref_count: int,
ref_count: int,
// We may want a drop, and to be careful about stringing this
// thing along.
......@@ -130,12 +130,12 @@ pub struct Buffer<T> {
}
pub struct PacketHeader {
mut state: State,
mut blocked_task: *rust_task,
state: State,
blocked_task: *rust_task,
// This is a transmute_copy of a ~buffer, that can also be cast
// to a buffer_header if need be.
mut buffer: *libc::c_void,
buffer: *libc::c_void,
}
pub fn PacketHeader() -> PacketHeader {
......@@ -148,14 +148,14 @@ pub fn PacketHeader() -> PacketHeader {
pub impl PacketHeader {
// Returns the old state.
unsafe fn mark_blocked(&self, this: *rust_task) -> State {
unsafe fn mark_blocked(&mut self, this: *rust_task) -> State {
rustrt::rust_task_ref(this);
let old_task = swap_task(&mut self.blocked_task, this);
assert!(old_task.is_null());
swap_state_acq(&mut self.state, Blocked)
}
unsafe fn unblock(&self) {
unsafe fn unblock(&mut self) {
let old_task = swap_task(&mut self.blocked_task, ptr::null());
if !old_task.is_null() {
rustrt::rust_task_deref(old_task)
......@@ -169,13 +169,13 @@ unsafe fn unblock(&self) {
// unsafe because this can do weird things to the space/time
// continuum. It ends making multiple unique pointers to the same
// thing. You'll proobably want to forget them when you're done.
unsafe fn buf_header(&self) -> ~BufferHeader {
// thing. You'll probably want to forget them when you're done.
unsafe fn buf_header(&mut self) -> ~BufferHeader {
assert!(self.buffer.is_not_null());
transmute_copy(&self.buffer)
}
fn set_buffer<T:Owned>(&self, b: ~Buffer<T>) {
fn set_buffer<T:Owned>(&mut self, b: ~Buffer<T>) {
unsafe {
self.buffer = transmute_copy(&b);
}
......@@ -184,15 +184,15 @@ fn set_buffer<T:Owned>(&self, b: ~Buffer<T>) {
pub struct Packet<T> {
header: PacketHeader,
mut payload: Option<T>,
payload: Option<T>,
}
pub trait HasBuffer {
fn set_buffer(&self, b: *libc::c_void);
fn set_buffer(&mut self, b: *libc::c_void);
}
impl<T:Owned> HasBuffer for Packet<T> {
fn set_buffer(&self, b: *libc::c_void) {
fn set_buffer(&mut self, b: *libc::c_void) {
self.header.buffer = b;
}
}
......@@ -204,7 +204,7 @@ pub fn mk_packet<T:Owned>() -> Packet<T> {
}
}
fn unibuffer<T>() -> ~Buffer<Packet<T>> {
let b = ~Buffer {
let mut b = ~Buffer {
header: BufferHeader(),
data: Packet {
header: PacketHeader(),
......@@ -218,22 +218,25 @@ fn unibuffer<T>() -> ~Buffer<Packet<T>> {
b
}
pub fn packet<T>() -> *Packet<T> {
let b = unibuffer();
let p = ptr::to_unsafe_ptr(&(b.data));
pub fn packet<T>() -> *mut Packet<T> {
let mut b = unibuffer();
let p = ptr::to_mut_unsafe_ptr(&mut b.data);
// We'll take over memory management from here.
unsafe { forget(b) }
unsafe {
forget(b);
}
p
}
pub fn entangle_buffer<T:Owned,Tstart:Owned>(
buffer: ~Buffer<T>,
init: &fn(*libc::c_void, x: &T) -> *Packet<Tstart>)
-> (SendPacketBuffered<Tstart, T>, RecvPacketBuffered<Tstart, T>)
{
let p = init(unsafe { transmute_copy(&buffer) }, &buffer.data);
unsafe { forget(buffer) }
(SendPacketBuffered(p), RecvPacketBuffered(p))
mut buffer: ~Buffer<T>,
init: &fn(*libc::c_void, x: &mut T) -> *mut Packet<Tstart>)
-> (SendPacketBuffered<Tstart, T>, RecvPacketBuffered<Tstart, T>) {
unsafe {
let p = init(transmute_copy(&buffer), &mut buffer.data);
forget(buffer);
(SendPacketBuffered(p), RecvPacketBuffered(p))
}
}
pub fn swap_task(dst: &mut *rust_task, src: *rust_task) -> *rust_task {
......@@ -292,7 +295,7 @@ fn swap_state_rel(dst: &mut State, src: State) -> State {
}
}
pub unsafe fn get_buffer<T>(p: *PacketHeader) -> ~Buffer<T> {
pub unsafe fn get_buffer<T>(p: *mut PacketHeader) -> ~Buffer<T> {
transmute((*p).buf_header())
}
......@@ -306,10 +309,14 @@ struct BufferResource<T> {
impl<T> Drop for BufferResource<T> {
fn finalize(&self) {
unsafe {
let b = move_it!(self.buffer);
let this: &mut BufferResource<T> = transmute(self);
let mut b = move_it!(this.buffer);
//let p = ptr::to_unsafe_ptr(*b);
//error!("drop %?", p);
let old_count = intrinsics::atomic_xsub_rel(&mut b.header.ref_count, 1);
let old_count = intrinsics::atomic_xsub_rel(
&mut b.header.ref_count,
1);
//let old_count = atomic_xchng_rel(b.header.ref_count, 0);
if old_count == 1 {
// The new count is 0.
......@@ -323,10 +330,12 @@ fn finalize(&self) {
}
}
fn BufferResource<T>(b: ~Buffer<T>) -> BufferResource<T> {
fn BufferResource<T>(mut b: ~Buffer<T>) -> BufferResource<T> {
//let p = ptr::to_unsafe_ptr(*b);
//error!("take %?", p);
unsafe { intrinsics::atomic_xadd_acq(&mut b.header.ref_count, 1) };
unsafe {
intrinsics::atomic_xadd_acq(&mut b.header.ref_count, 1);
}
BufferResource {
// tjc: ????
......@@ -334,10 +343,12 @@ fn BufferResource<T>(b: ~Buffer<T>) -> BufferResource<T> {
}
}
pub fn send<T,Tbuffer>(p: SendPacketBuffered<T,Tbuffer>, payload: T) -> bool {
pub fn send<T,Tbuffer>(mut p: SendPacketBuffered<T,Tbuffer>,
payload: T)
-> bool {
let header = p.header();
let p_ = p.unwrap();
let p = unsafe { &*p_ };
let mut p_ = p.unwrap();
let p = unsafe { &mut *p_ };
assert!(ptr::to_unsafe_ptr(&(p.header)) == header);
assert!(p.payload.is_none());
p.payload = Some(payload);
......@@ -391,11 +402,12 @@ pub fn recv<T:Owned,Tbuffer:Owned>(
a message, or `Some(T)` if a message was received.
*/
pub fn try_recv<T:Owned,Tbuffer:Owned>(p: RecvPacketBuffered<T, Tbuffer>)
-> Option<T>
{
let p_ = p.unwrap();
let p = unsafe { &*p_ };
pub fn try_recv<T:Owned,Tbuffer:Owned>(mut p: RecvPacketBuffered<T, Tbuffer>)
-> Option<T> {
let mut p_ = p.unwrap();
let mut p = unsafe {
&mut *p_
};
do (|| {
try_recv_(p)
......@@ -412,7 +424,7 @@ pub fn try_recv<T:Owned,Tbuffer:Owned>(p: RecvPacketBuffered<T, Tbuffer>)
}
}
fn try_recv_<T:Owned>(p: &Packet<T>) -> Option<T> {
fn try_recv_<T:Owned>(p: &mut Packet<T>) -> Option<T> {
// optimistic path
match p.header.state {
Full => {
......@@ -498,16 +510,20 @@ fn try_recv_<T:Owned>(p: &Packet<T>) -> Option<T> {
}
/// Returns true if messages are available.
pub fn peek<T:Owned,Tb:Owned>(p: &RecvPacketBuffered<T, Tb>) -> bool {
match unsafe {(*p.header()).state} {
Empty | Terminated => false,
Blocked => fail!(~"peeking on blocked packet"),
Full => true
pub fn peek<T:Owned,Tb:Owned>(p: &mut RecvPacketBuffered<T, Tb>) -> bool {
unsafe {
match (*p.header()).state {
Empty | Terminated => false,
Blocked => fail!(~"peeking on blocked packet"),
Full => true
}
}
}
fn sender_terminate<T:Owned>(p: *Packet<T>) {
let p = unsafe { &*p };
fn sender_terminate<T:Owned>(p: *mut Packet<T>) {
let p = unsafe {
&mut *p
};
match swap_state_rel(&mut p.header.state, Terminated) {
Empty => {
// The receiver will eventually clean up.
......@@ -536,8 +552,10 @@ fn sender_terminate<T:Owned>(p: *Packet<T>) {
}
}
fn receiver_terminate<T:Owned>(p: *Packet<T>) {
let p = unsafe { &*p };
fn receiver_terminate<T:Owned>(p: *mut Packet<T>) {
let p = unsafe {
&mut *p
};
match swap_state_rel(&mut p.header.state, Terminated) {
Empty => {
assert!(p.header.blocked_task.is_null());
......@@ -569,8 +587,10 @@ fn receiver_terminate<T:Owned>(p: *Packet<T>) {
closed by the sender or has a message waiting to be received.
*/
pub fn wait_many<T: Selectable>(pkts: &[T]) -> uint {
let this = unsafe { rustrt::rust_get_task() };
pub fn wait_many<T: Selectable>(pkts: &mut [T]) -> uint {
let this = unsafe {
rustrt::rust_get_task()
};
unsafe {
rustrt::task_clear_event_reject(this);
......@@ -578,19 +598,19 @@ pub fn wait_many<T: Selectable>(pkts: &[T]) -> uint {
let mut data_avail = false;
let mut ready_packet = pkts.len();
for pkts.eachi |i, p| {
for vec::eachi_mut(pkts) |i, p| {
unsafe {
let p = &*p.header();
let p = &mut *p.header();
let old = p.mark_blocked(this);
match old {
Full | Terminated => {
data_avail = true;
ready_packet = i;
(*p).state = old;
break;
}
Blocked => fail!(~"blocking on blocked packet"),
Empty => ()
Full | Terminated => {
data_avail = true;
ready_packet = i;
(*p).state = old;
break;
}
Blocked => fail!(~"blocking on blocked packet"),
Empty => ()
}
}
}
......@@ -598,7 +618,14 @@ pub fn wait_many<T: Selectable>(pkts: &[T]) -> uint {
while !data_avail {
debug!("sleeping on %? packets", pkts.len());
let event = wait_event(this) as *PacketHeader;
let pos = vec::position(pkts, |p| p.header() == event);
let mut pos = None;
for vec::eachi_mut(pkts) |i, p| {
if p.header() == event {
pos = Some(i);
break;
}
};
match pos {
Some(i) => {
......@@ -609,11 +636,15 @@ pub fn wait_many<T: Selectable>(pkts: &[T]) -> uint {
}
}
debug!("%?", pkts[ready_packet]);
debug!("%?", &mut pkts[ready_packet]);
for pkts.each |p| { unsafe{ (*p.header()).unblock()} }
for vec::each_mut(pkts) |p| {
unsafe {
(*p.header()).unblock()
}
}
debug!("%?, %?", ready_packet, pkts[ready_packet]);
debug!("%?, %?", ready_packet, &mut pkts[ready_packet]);
unsafe {
assert!((*pkts[ready_packet].header()).state == Full
......@@ -629,65 +660,58 @@ pub fn wait_many<T: Selectable>(pkts: &[T]) -> uint {
*/
pub type SendPacket<T> = SendPacketBuffered<T, Packet<T>>;
pub fn SendPacket<T>(p: *Packet<T>) -> SendPacket<T> {
pub fn SendPacket<T>(p: *mut Packet<T>) -> SendPacket<T> {
SendPacketBuffered(p)
}
pub struct SendPacketBuffered<T, Tbuffer> {
mut p: Option<*Packet<T>>,
mut buffer: Option<BufferResource<Tbuffer>>,
p: Option<*mut Packet<T>>,
buffer: Option<BufferResource<Tbuffer>>,
}
#[unsafe_destructor]
impl<T:Owned,Tbuffer:Owned> Drop for SendPacketBuffered<T,Tbuffer> {
fn finalize(&self) {
//if self.p != none {
// debug!("drop send %?", option::get(self.p));
//}
if self.p != None {
let mut p = None;
p <-> self.p;
sender_terminate(p.unwrap())
unsafe {
let this: &mut SendPacketBuffered<T,Tbuffer> = transmute(self);
if this.p != None {
let mut p = None;
p <-> this.p;
sender_terminate(p.unwrap())
}
}
//unsafe { error!("send_drop: %?",
// if self.buffer == none {
// "none"
// } else { "some" }); }
}
}
pub fn SendPacketBuffered<T,Tbuffer>(p: *Packet<T>)
-> SendPacketBuffered<T, Tbuffer> {
//debug!("take send %?", p);
pub fn SendPacketBuffered<T,Tbuffer>(p: *mut Packet<T>)
-> SendPacketBuffered<T,Tbuffer> {
SendPacketBuffered {
p: Some(p),
buffer: unsafe {
Some(BufferResource(
get_buffer(ptr::to_unsafe_ptr(&((*p).header)))))
Some(BufferResource(get_buffer(&mut (*p).header)))
}
}
}
pub impl<T,Tbuffer> SendPacketBuffered<T,Tbuffer> {
fn unwrap(&self) -> *Packet<T> {
fn unwrap(&mut self) -> *mut Packet<T> {
let mut p = None;
p <-> self.p;
p.unwrap()
}
fn header(&self) -> *PacketHeader {
fn header(&mut self) -> *mut PacketHeader {
match self.p {
Some(packet) => unsafe {
let packet = &*packet;
let header = ptr::to_unsafe_ptr(&(packet.header));
//forget(packet);
header
},
None => fail!(~"packet already consumed")
Some(packet) => unsafe {
let packet = &mut *packet;
let header = ptr::to_mut_unsafe_ptr(&mut packet.header);
header
},
None => fail!(~"packet already consumed")
}
}
fn reuse_buffer(&self) -> BufferResource<Tbuffer> {
fn reuse_buffer(&mut self) -> BufferResource<Tbuffer> {
//error!("send reuse_buffer");
let mut tmp = None;
tmp <-> self.buffer;
......@@ -699,41 +723,37 @@ fn reuse_buffer(&self) -> BufferResource<Tbuffer> {
/// message.
pub type RecvPacket<T> = RecvPacketBuffered<T, Packet<T>>;
pub fn RecvPacket<T>(p: *Packet<T>) -> RecvPacket<T> {
pub fn RecvPacket<T>(p: *mut Packet<T>) -> RecvPacket<T> {
RecvPacketBuffered(p)
}
pub struct RecvPacketBuffered<T, Tbuffer> {
mut p: Option<*Packet<T>>,
mut buffer: Option<BufferResource<Tbuffer>>,
p: Option<*mut Packet<T>>,
buffer: Option<BufferResource<Tbuffer>>,
}
#[unsafe_destructor]
impl<T:Owned,Tbuffer:Owned> Drop for RecvPacketBuffered<T,Tbuffer> {
fn finalize(&self) {
//if self.p != none {
// debug!("drop recv %?", option::get(self.p));
//}
if self.p != None {
let mut p = None;
p <-> self.p;
receiver_terminate(p.unwrap())
unsafe {
let this: &mut RecvPacketBuffered<T,Tbuffer> = transmute(self);
if this.p != None {
let mut p = None;
p <-> this.p;
receiver_terminate(p.unwrap())
}
}
//unsafe { error!("recv_drop: %?",
// if self.buffer == none {
// "none"
// } else { "some" }); }
}
}
pub impl<T:Owned,Tbuffer:Owned> RecvPacketBuffered<T, Tbuffer> {
fn unwrap(&self) -> *Packet<T> {
fn unwrap(&mut self) -> *mut Packet<T> {
let mut p = None;
p <-> self.p;
p.unwrap()
}
fn reuse_buffer(&self) -> BufferResource<Tbuffer> {
//error!("recv reuse_buffer");
fn reuse_buffer(&mut self) -> BufferResource<Tbuffer> {
let mut tmp = None;
tmp <-> self.buffer;
tmp.unwrap()
......@@ -741,27 +761,24 @@ fn reuse_buffer(&self) -> BufferResource<Tbuffer> {
}
impl<T:Owned,Tbuffer:Owned> Selectable for RecvPacketBuffered<T, Tbuffer> {
fn header(&self) -> *PacketHeader {
fn header(&mut self) -> *mut PacketHeader {
match self.p {
Some(packet) => unsafe {
let packet = &*packet;
let header = ptr::to_unsafe_ptr(&(packet.header));
//forget(packet);
header
},
None => fail!(~"packet already consumed")
Some(packet) => unsafe {
let packet = &mut *packet;
let header = ptr::to_mut_unsafe_ptr(&mut packet.header);
header
},
None => fail!(~"packet already consumed")
}
}
}
pub fn RecvPacketBuffered<T,Tbuffer>(p: *Packet<T>)
-> RecvPacketBuffered<T,Tbuffer> {
//debug!("take recv %?", p);
pub fn RecvPacketBuffered<T,Tbuffer>(p: *mut Packet<T>)
-> RecvPacketBuffered<T,Tbuffer> {
RecvPacketBuffered {
p: Some(p),
buffer: unsafe {
Some(BufferResource(
get_buffer(ptr::to_unsafe_ptr(&((*p).header)))))
Some(BufferResource(get_buffer(&mut (*p).header)))
}
}
}
......@@ -800,51 +817,55 @@ pub fn entangle<T>() -> (SendPacket<T>, RecvPacket<T>) {
*/
pub fn select2<A:Owned,Ab:Owned,B:Owned,Bb:Owned>(
a: RecvPacketBuffered<A, Ab>,
b: RecvPacketBuffered<B, Bb>)
mut a: RecvPacketBuffered<A, Ab>,
mut b: RecvPacketBuffered<B, Bb>)
-> Either<(Option<A>, RecvPacketBuffered<B, Bb>),
(RecvPacketBuffered<A, Ab>, Option<B>)>
{
let i = wait_many([a.header(), b.header()]);
(RecvPacketBuffered<A, Ab>, Option<B>)> {
let mut endpoints = [ a.header(), b.header() ];
let i = wait_many(endpoints);
match i {
0 => Left((try_recv(a), b)),
1 => Right((a, try_recv(b))),
_ => fail!(~"select2 return an invalid packet")
0 => Left((try_recv(a), b)),
1 => Right((a, try_recv(b))),
_ => fail!(~"select2 return an invalid packet")
}
}
pub trait Selectable {
fn header(&self) -> *PacketHeader;
fn header(&mut self) -> *mut PacketHeader;
}
impl Selectable for *PacketHeader {
fn header(&self) -> *PacketHeader { *self }
impl Selectable for *mut PacketHeader {
fn header(&mut self) -> *mut PacketHeader { *self }
}
/// Returns the index of an endpoint that is ready to receive.
pub fn selecti<T:Selectable>(endpoints: &[T]) -> uint {
pub fn selecti<T:Selectable>(endpoints: &mut [T]) -> uint {
wait_many(endpoints)
}
/// Returns 0 or 1 depending on which endpoint is ready to receive
pub fn select2i<A:Selectable,B:Selectable>(a: &A, b: &B) ->
Either<(), ()> {
match wait_many([a.header(), b.header()]) {
0 => Left(()),
1 => Right(()),
_ => fail!(~"wait returned unexpected index")
pub fn select2i<A:Selectable,B:Selectable>(a: &mut A, b: &mut B)
-> Either<(), ()> {
let mut endpoints = [ a.header(), b.header() ];
match wait_many(endpoints) {
0 => Left(()),
1 => Right(()),
_ => fail!(~"wait returned unexpected index")
}
}
/** Waits on a set of endpoints. Returns a message, its index, and a
list of the remaining endpoints.
/// Waits on a set of endpoints. Returns a message, its index, and a
/// list of the remaining endpoints.
pub fn select<T:Owned,Tb:Owned>(mut endpoints: ~[RecvPacketBuffered<T, Tb>])
-> (uint,
Option<T>,
~[RecvPacketBuffered<T, Tb>]) {
let mut endpoint_headers = ~[];
for vec::each_mut(endpoints) |endpoint| {
endpoint_headers.push(endpoint.header());
}
*/
pub fn select<T:Owned,Tb:Owned>(endpoints: ~[RecvPacketBuffered<T, Tb>])
-> (uint, Option<T>, ~[RecvPacketBuffered<T, Tb>])
{
let ready = wait_many(endpoints.map(|p| p.header()));
let ready = wait_many(endpoint_headers);
let mut remaining = endpoints;
let port = remaining.swap_remove(ready);
let result = try_recv(port);
......
......@@ -72,7 +72,7 @@ fn peek(&self) -> bool {
}
impl<T:Owned,U:Owned> Selectable for DuplexStream<T, U> {
fn header(&self) -> *pipes::PacketHeader {
fn header(&mut self) -> *mut pipes::PacketHeader {
self.port.header()
}
}
......
......@@ -14,10 +14,11 @@
use uv::iotask;
use uv::iotask::IoTask;
use core::libc;
use core::libc::c_void;
use core::cast::transmute;
use core::cast;
use core::comm::{stream, Chan, SharedChan, Port, select2i};
use core::libc::c_void;
use core::libc;
/**
* Wait for timeout period then send provided value over a channel
......@@ -120,22 +121,28 @@ pub fn sleep(iotask: &IoTask, msecs: uint) {
pub fn recv_timeout<T:Copy + Owned>(iotask: &IoTask,
msecs: uint,
wait_po: &Port<T>)
-> Option<T> {
let (timeout_po, timeout_ch) = stream::<()>();
-> Option<T> {
let mut (timeout_po, timeout_ch) = stream::<()>();
delayed_send(iotask, msecs, &timeout_ch, ());
// FIXME: This could be written clearer (#2618)
either::either(
|_| {
None
}, |_| {
Some(wait_po.recv())
}, &select2i(&timeout_po, wait_po)
)
// XXX: Workaround due to ports and channels not being &mut. They should
// be.
unsafe {
let wait_po = cast::transmute_mut(wait_po);
// FIXME: This could be written clearer (#2618)
either::either(
|_| {
None
}, |_| {
Some(wait_po.recv())
}, &select2i(&mut timeout_po, wait_po)
)
}
}
// INTERNAL API
extern fn delayed_send_cb(handle: *uv::ll::uv_timer_t,
status: libc::c_int) {
extern fn delayed_send_cb(handle: *uv::ll::uv_timer_t, status: libc::c_int) {
unsafe {
debug!(
"delayed_send_cb handle %? status %?", handle, status);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册