pipes.rs 25.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

E
Eric Holk 已提交
11 12 13 14 15 16 17 18
/*! Runtime support for message passing with protocol enforcement.


Pipes consist of two endpoints. One endpoint can send messages and
the other can receive messages. The set of legal messages and which
directions they can flow at any given point are determined by a
protocol. Below is an example protocol.

19
~~~ {.rust}
20
proto! pingpong (
E
Eric Holk 已提交
21 22 23 24 25 26
    ping: send {
        ping -> pong
    }
    pong: recv {
        pong -> ping
    }
27
)
E
Eric Holk 已提交
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
~~~

The `proto!` syntax extension will convert this into a module called
`pingpong`, which includes a set of types and functions that can be
used to write programs that follow the pingpong protocol.

*/

/* IMPLEMENTATION NOTES

The initial design for this feature is available at:

https://github.com/eholk/rust/wiki/Proposal-for-channel-contracts

Much of the design in that document is still accurate. There are
several components for the pipe implementation. First of all is the
syntax extension. To see how that works, it is best see comments in
libsyntax/ext/pipes.rs.

This module includes two related pieces of the runtime
48
implementation: support for unbounded and bounded
E
Eric Holk 已提交
49 50 51 52
protocols. The main difference between the two is the type of the
buffer that is carried along in the endpoint data structures.


53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
The heart of the implementation is the packet type. It contains a
header and a payload field. Much of the code in this module deals with
the header field. This is where the synchronization information is
stored. In the case of a bounded protocol, the header also includes a
pointer to the buffer the packet is contained in.

Packets represent a single message in a protocol. The payload field
gets instatiated at the type of the message, which is usually an enum
generated by the pipe compiler. Packets are conceptually single use,
although in bounded protocols they are reused each time around the
loop.


Packets are usually handled through a send_packet_buffered or
recv_packet_buffered object. Each packet is referenced by one
send_packet and one recv_packet, and these wrappers enforce that only
one end can send and only one end can receive. The structs also
include a destructor that marks packets are terminated if the sender
or receiver destroys the object before sending or receiving a value.

The *_packet_buffered structs take two type parameters. The first is
the message type for the current packet (or state). The second
represents the type of the whole buffer. For bounded protocols, the
protocol compiler generates a struct with a field for each protocol
state. This generated struct is used as the buffer type parameter. For
unbounded protocols, the buffer is simply one packet, so there is a
shorthand struct called send_packet and recv_packet, where the buffer
type is just `packet<T>`. Using the same underlying structure for both
bounded and unbounded protocols allows for less code duplication.
82

E
Eric Holk 已提交
83
*/
84

85 86
#[allow(missing_doc)];

87
use container::Container;
88
use cast::{forget, transmute, transmute_copy};
P
Patrick Walton 已提交
89
use either::{Either, Left, Right};
90
use kinds::Owned;
91
use libc;
92
use ops::Drop;
93
use option::{None, Option, Some};
94
use unstable::finally::Finally;
95
use unstable::intrinsics;
96
use ptr;
D
Daniel Micay 已提交
97
use ptr::RawPtr;
98 99
use task;
use vec;
100
use vec::OwnedVector;
A
Alex Crichton 已提交
101
use util::replace;
102

103
static SPIN_COUNT: uint = 0;
104

P
Paul Stansifer 已提交
105
macro_rules! move_it (
106
    { $x:expr } => ( unsafe { let y = *ptr::to_unsafe_ptr(&($x)); y } )
P
Paul Stansifer 已提交
107
)
108

109
#[deriving(Eq)]
110 111 112 113 114
enum State {
    Empty,
    Full,
    Blocked,
    Terminated
115 116
}

117
pub struct BufferHeader {
118 119
    // Tracks whether this buffer needs to be freed. We can probably
    // get away with restricting it to 0 or 1, if we're careful.
120
    ref_count: int,
121 122 123 124 125

    // We may want a drop, and to be careful about stringing this
    // thing along.
}

E
Erick Tryzelaar 已提交
126
pub fn BufferHeader() -> BufferHeader {
B
Brian Anderson 已提交
127 128 129 130 131
    BufferHeader {
        ref_count: 0
    }
}

132
// This is for protocols to associate extra data to thread around.
133
pub struct Buffer<T> {
134 135 136
    header: BufferHeader,
    data: T,
}
E
Eric Holk 已提交
137

138
pub struct PacketHeader {
139 140
    state: State,
    blocked_task: *rust_task,
141

142
    // This is a transmute_copy of a ~buffer, that can also be cast
143
    // to a buffer_header if need be.
144
    buffer: *libc::c_void,
145 146
}

147
pub fn PacketHeader() -> PacketHeader {
148 149 150 151 152 153
    PacketHeader {
        state: Empty,
        blocked_task: ptr::null(),
        buffer: ptr::null()
    }
}
154

155
impl PacketHeader {
156
    // Returns the old state.
157
    pub unsafe fn mark_blocked(&mut self, this: *rust_task) -> State {
158
        rustrt::rust_task_ref(this);
E
Eric Holk 已提交
159
        let old_task = swap_task(&mut self.blocked_task, this);
P
Patrick Walton 已提交
160
        assert!(old_task.is_null());
161
        swap_state_acq(&mut self.state, Blocked)
162 163
    }

164
    pub unsafe fn unblock(&mut self) {
E
Eric Holk 已提交
165
        let old_task = swap_task(&mut self.blocked_task, ptr::null());
166
        if !old_task.is_null() {
167
            rustrt::rust_task_deref(old_task)
168
        }
169 170 171 172
        match swap_state_acq(&mut self.state, Empty) {
          Empty | Blocked => (),
          Terminated => self.state = Terminated,
          Full => self.state = Full
173 174 175 176 177
        }
    }

    // unsafe because this can do weird things to the space/time
    // continuum. It ends making multiple unique pointers to the same
178
    // thing. You'll probably want to forget them when you're done.
179
    pub unsafe fn buf_header(&mut self) -> ~BufferHeader {
P
Patrick Walton 已提交
180
        assert!(self.buffer.is_not_null());
181
        transmute_copy(&self.buffer)
182
    }
183

184
    pub fn set_buffer<T:Owned>(&mut self, b: ~Buffer<T>) {
185
        unsafe {
186
            self.buffer = transmute_copy(&b);
187
        }
188
    }
189 190
}

191 192
pub struct Packet<T> {
    header: PacketHeader,
193
    payload: Option<T>,
194
}
195

196
pub trait HasBuffer {
197
    fn set_buffer(&mut self, b: *libc::c_void);
198 199
}

200
impl<T:Owned> HasBuffer for Packet<T> {
201
    fn set_buffer(&mut self, b: *libc::c_void) {
202 203 204 205
        self.header.buffer = b;
    }
}

206
pub fn mk_packet<T:Owned>() -> Packet<T> {
207
    Packet {
208
        header: PacketHeader(),
209
        payload: None,
210 211
    }
}
212
fn unibuffer<T>() -> ~Buffer<Packet<T>> {
213
    let mut b = ~Buffer {
214 215 216
        header: BufferHeader(),
        data: Packet {
            header: PacketHeader(),
217
            payload: None,
218 219 220 221
        }
    };

    unsafe {
222
        b.data.header.buffer = transmute_copy(&b);
223
    }
L
Luqman Aden 已提交
224
    b
225 226
}

227 228 229
pub fn packet<T>() -> *mut Packet<T> {
    let mut b = unibuffer();
    let p = ptr::to_mut_unsafe_ptr(&mut b.data);
230
    // We'll take over memory management from here.
231 232 233
    unsafe {
        forget(b);
    }
234 235
    p
}
236

237
pub fn entangle_buffer<T:Owned,Tstart:Owned>(
238 239
    mut buffer: ~Buffer<T>,
    init: &fn(*libc::c_void, x: &mut T) -> *mut Packet<Tstart>)
240
    -> (RecvPacketBuffered<Tstart, T>, SendPacketBuffered<Tstart, T>) {
241 242 243
    unsafe {
        let p = init(transmute_copy(&buffer), &mut buffer.data);
        forget(buffer);
244
        (RecvPacketBuffered(p), SendPacketBuffered(p))
245
    }
246 247
}

T
Tim Chevalier 已提交
248
pub fn swap_task(dst: &mut *rust_task, src: *rust_task) -> *rust_task {
249 250 251
    // It might be worth making both acquire and release versions of
    // this.
    unsafe {
252
        transmute(intrinsics::atomic_xchg(transmute(dst), src as int))
253 254 255
    }
}

256
#[allow(non_camel_case_types)]
257
pub type rust_task = libc::c_void;
E
Eric Holk 已提交
258

259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
pub mod rustrt {
    use libc;
    use super::rust_task;

    pub extern {
        #[rust_stack]
        unsafe fn rust_get_task() -> *rust_task;
        #[rust_stack]
        unsafe fn rust_task_ref(task: *rust_task);
        unsafe fn rust_task_deref(task: *rust_task);

        #[rust_stack]
        unsafe fn task_clear_event_reject(task: *rust_task);

        unsafe fn task_wait_event(this: *rust_task,
                                  killed: &mut *libc::c_void)
                               -> bool;
        unsafe fn task_signal_event(target: *rust_task, event: *libc::c_void);
    }
278 279
}

280
fn wait_event(this: *rust_task) -> *libc::c_void {
281 282
    unsafe {
        let mut event = ptr::null();
283

284 285
        let killed = rustrt::task_wait_event(this, &mut event);
        if killed && !task::failing() {
286
            fail!("killed")
287 288
        }
        event
289 290 291
    }
}

T
Tim Chevalier 已提交
292
fn swap_state_acq(dst: &mut State, src: State) -> State {
293
    unsafe {
294
        transmute(intrinsics::atomic_xchg_acq(transmute(dst), src as int))
295 296 297
    }
}

T
Tim Chevalier 已提交
298
fn swap_state_rel(dst: &mut State, src: State) -> State {
299
    unsafe {
300
        transmute(intrinsics::atomic_xchg_rel(transmute(dst), src as int))
301 302 303
    }
}

304
pub unsafe fn get_buffer<T>(p: *mut PacketHeader) -> ~Buffer<T> {
305 306
    transmute((*p).buf_header())
}
307

308
// This could probably be done with SharedMutableState to avoid move_it!().
309 310
struct BufferResource<T> {
    buffer: ~Buffer<T>,
311

312 313
}

314
#[unsafe_destructor]
315
impl<T> Drop for BufferResource<T> {
316
    fn finalize(&self) {
317
        unsafe {
318 319 320
            let this: &mut BufferResource<T> = transmute(self);

            let mut b = move_it!(this.buffer);
321
            //let p = ptr::to_unsafe_ptr(*b);
322
            //error!("drop %?", p);
323 324 325
            let old_count = intrinsics::atomic_xsub_rel(
                &mut b.header.ref_count,
                1);
326 327 328 329 330 331 332
            //let old_count = atomic_xchng_rel(b.header.ref_count, 0);
            if old_count == 1 {
                // The new count is 0.

                // go go gadget drop glue
            }
            else {
L
Luqman Aden 已提交
333
                forget(b)
334 335 336 337 338
            }
        }
    }
}

339
fn BufferResource<T>(mut b: ~Buffer<T>) -> BufferResource<T> {
340
    //let p = ptr::to_unsafe_ptr(*b);
341
    //error!("take %?", p);
342 343 344
    unsafe {
        intrinsics::atomic_xadd_acq(&mut b.header.ref_count, 1);
    }
345 346 347

    BufferResource {
        // tjc: ????
L
Luqman Aden 已提交
348
        buffer: b
349 350
    }
}
B
Brian Anderson 已提交
351

352 353 354
pub fn send<T,Tbuffer>(mut p: SendPacketBuffered<T,Tbuffer>,
                       payload: T)
                       -> bool {
355
    let header = p.header();
T
Tim Chevalier 已提交
356
    let p_ = p.unwrap();
357
    let p = unsafe { &mut *p_ };
358
    assert_eq!(ptr::to_unsafe_ptr(&(p.header)), header);
P
Patrick Walton 已提交
359
    assert!(p.payload.is_none());
L
Luqman Aden 已提交
360
    p.payload = Some(payload);
361 362 363 364 365 366 367 368 369
    let old_state = swap_state_rel(&mut p.header.state, Full);
    match old_state {
        Empty => {
            // Yay, fastpath.

            // The receiver will eventually clean this up.
            //unsafe { forget(p); }
            return true;
        }
370
        Full => fail!("duplicate send"),
371 372 373 374 375 376 377
        Blocked => {
            debug!("waking up task for %?", p_);
            let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
            if !old_task.is_null() {
                unsafe {
                    rustrt::task_signal_event(
                        old_task,
378
                        ptr::to_unsafe_ptr(&(p.header)) as *libc::c_void);
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393
                    rustrt::rust_task_deref(old_task);
                }
            }

            // The receiver will eventually clean this up.
            //unsafe { forget(p); }
            return true;
        }
        Terminated => {
            // The receiver will never receive this. Rely on drop_glue
            // to clean everything up.
            return false;
        }
    }
}
394

E
Eric Holk 已提交
395 396 397 398 399
/** Receives a message from a pipe.

Fails if the sender closes the connection.

*/
400
pub fn recv<T:Owned,Tbuffer:Owned>(
B
Brian Anderson 已提交
401
    p: RecvPacketBuffered<T, Tbuffer>) -> T {
L
Luqman Aden 已提交
402
    try_recv(p).expect("connection closed")
403 404
}

E
Eric Holk 已提交
405 406
/** Attempts to receive a message from a pipe.

B
Brian Anderson 已提交
407
Returns `None` if the sender has closed the connection without sending
B
Brian Anderson 已提交
408
a message, or `Some(T)` if a message was received.
E
Eric Holk 已提交
409 410

*/
411 412
pub fn try_recv<T:Owned,Tbuffer:Owned>(mut p: RecvPacketBuffered<T, Tbuffer>)
                                       -> Option<T> {
T
Tim Chevalier 已提交
413 414
    let p_ = p.unwrap();
    let p = unsafe { &mut *p_ };
415

416 417 418 419 420 421 422 423 424
    do (|| {
        try_recv_(p)
    }).finally {
        unsafe {
            if task::failing() {
                p.header.state = Terminated;
                let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
                if !old_task.is_null() {
                    rustrt::rust_task_deref(old_task);
425 426 427
                }
            }
        }
428
    }
429
}
430

431
fn try_recv_<T:Owned>(p: &mut Packet<T>) -> Option<T> {
432 433
    // optimistic path
    match p.header.state {
434
      Full => {
A
Alex Crichton 已提交
435
        let payload = replace(&mut p.payload, None);
436
        p.header.state = Empty;
437
        return Some(payload.unwrap())
438
      },
439
      Terminated => return None,
440 441 442 443
      _ => {}
    }

    // regular path
444 445 446 447 448
    let this = unsafe { rustrt::rust_get_task() };
    unsafe {
        rustrt::task_clear_event_reject(this);
        rustrt::rust_task_ref(this);
    };
T
Tim Chevalier 已提交
449 450
    debug!("blocked = %x this = %x", p.header.blocked_task as uint,
           this as uint);
E
Eric Holk 已提交
451
    let old_task = swap_task(&mut p.header.blocked_task, this);
T
Tim Chevalier 已提交
452 453 454
    debug!("blocked = %x this = %x old_task = %x",
           p.header.blocked_task as uint,
           this as uint, old_task as uint);
P
Patrick Walton 已提交
455
    assert!(old_task.is_null());
456
    let mut first = true;
457
    let mut count = SPIN_COUNT;
458
    loop {
459 460 461 462
        unsafe {
            rustrt::task_clear_event_reject(this);
        }

E
Eric Holk 已提交
463
        let old_state = swap_state_acq(&mut p.header.state,
464
                                       Blocked);
465
        match old_state {
466
          Empty => {
467
            debug!("no data available on %?, going to sleep.", p);
468 469 470 471 472 473 474 475 476 477 478 479
            if count == 0 {
                wait_event(this);
            }
            else {
                count -= 1;
                // FIXME (#524): Putting the yield here destroys a lot
                // of the benefit of spinning, since we still go into
                // the scheduler at every iteration. However, without
                // this everything spins too much because we end up
                // sometimes blocking the thing we are waiting on.
                task::yield();
            }
P
Paul Stansifer 已提交
480
            debug!("woke up, p.state = %?", copy p.header.state);
481
          }
482
          Blocked => if first {
483
            fail!("blocking on already blocked packet")
484
          },
485
          Full => {
A
Alex Crichton 已提交
486
            let payload = replace(&mut p.payload, None);
E
Eric Holk 已提交
487
            let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
488
            if !old_task.is_null() {
489 490 491
                unsafe {
                    rustrt::rust_task_deref(old_task);
                }
492
            }
493
            p.header.state = Empty;
494
            return Some(payload.unwrap())
495
          }
496
          Terminated => {
E
Eric Holk 已提交
497 498
            // This assert detects when we've accidentally unsafely
            // casted too big of a number to a state.
499
            assert_eq!(old_state, Terminated);
500

E
Eric Holk 已提交
501
            let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
502
            if !old_task.is_null() {
503 504 505
                unsafe {
                    rustrt::rust_task_deref(old_task);
                }
506
            }
B
Brian Anderson 已提交
507
            return None;
508 509
          }
        }
510
        first = false;
511 512 513
    }
}

E
Eric Holk 已提交
514
/// Returns true if messages are available.
515 516 517 518
pub fn peek<T:Owned,Tb:Owned>(p: &mut RecvPacketBuffered<T, Tb>) -> bool {
    unsafe {
        match (*p.header()).state {
            Empty | Terminated => false,
519
            Blocked => fail!("peeking on blocked packet"),
520 521
            Full => true
        }
E
Eric Holk 已提交
522 523 524
    }
}

525 526 527 528
fn sender_terminate<T:Owned>(p: *mut Packet<T>) {
    let p = unsafe {
        &mut *p
    };
529 530
    match swap_state_rel(&mut p.header.state, Terminated) {
      Empty => {
531 532
        // The receiver will eventually clean up.
      }
533
      Blocked => {
534
        // wake up the target
E
Eric Holk 已提交
535
        let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
536
        if !old_task.is_null() {
537 538 539
            unsafe {
                rustrt::task_signal_event(
                    old_task,
540
                    ptr::to_unsafe_ptr(&(p.header)) as *libc::c_void);
541 542
                rustrt::rust_task_deref(old_task);
            }
E
Eric Holk 已提交
543
        }
544 545
        // The receiver will eventually clean up.
      }
546
      Full => {
547
        // This is impossible
548
        fail!("you dun goofed")
549
      }
550
      Terminated => {
P
Patrick Walton 已提交
551
        assert!(p.header.blocked_task.is_null());
552 553 554 555 556
        // I have to clean up, use drop_glue
      }
    }
}

557 558 559 560
fn receiver_terminate<T:Owned>(p: *mut Packet<T>) {
    let p = unsafe {
        &mut *p
    };
561 562
    match swap_state_rel(&mut p.header.state, Terminated) {
      Empty => {
P
Patrick Walton 已提交
563
        assert!(p.header.blocked_task.is_null());
564 565
        // the sender will clean up
      }
566
      Blocked => {
E
Eric Holk 已提交
567
        let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
568
        if !old_task.is_null() {
569 570
            unsafe {
                rustrt::rust_task_deref(old_task);
571
                assert_eq!(old_task, rustrt::rust_get_task());
572
            }
573
        }
574
      }
575
      Terminated | Full => {
P
Patrick Walton 已提交
576
        assert!(p.header.blocked_task.is_null());
577 578 579 580 581
        // I have to clean up, use drop_glue
      }
    }
}

E
Eric Holk 已提交
582 583 584 585 586 587 588 589 590 591
/** Returns when one of the packet headers reports data is available.

This function is primarily intended for building higher level waiting
functions, such as `select`, `select2`, etc.

It takes a vector slice of packet_headers and returns an index into
that vector. The index points to an endpoint that has either been
closed by the sender or has a message waiting to be received.

*/
592 593 594 595
pub fn wait_many<T: Selectable>(pkts: &mut [T]) -> uint {
    let this = unsafe {
        rustrt::rust_get_task()
    };
596 597 598 599

    unsafe {
        rustrt::task_clear_event_reject(this);
    }
E
Eric Holk 已提交
600 601 602

    let mut data_avail = false;
    let mut ready_packet = pkts.len();
603
    for vec::eachi_mut(pkts) |i, p| {
604
        unsafe {
605
            let p = &mut *p.header();
606 607
            let old = p.mark_blocked(this);
            match old {
608 609 610 611 612 613
                Full | Terminated => {
                    data_avail = true;
                    ready_packet = i;
                    (*p).state = old;
                    break;
                }
614
                Blocked => fail!("blocking on blocked packet"),
615
                Empty => ()
616
            }
E
Eric Holk 已提交
617 618 619 620
        }
    }

    while !data_avail {
P
Paul Stansifer 已提交
621
        debug!("sleeping on %? packets", pkts.len());
622
        let event = wait_event(this) as *PacketHeader;
623 624 625 626 627 628 629 630

        let mut pos = None;
        for vec::eachi_mut(pkts) |i, p| {
            if p.header() == event {
                pos = Some(i);
                break;
            }
        };
E
Eric Holk 已提交
631

632
        match pos {
B
Brian Anderson 已提交
633
          Some(i) => {
E
Eric Holk 已提交
634 635 636
            ready_packet = i;
            data_avail = true;
          }
B
Brian Anderson 已提交
637
          None => debug!("ignoring spurious event, %?", event)
E
Eric Holk 已提交
638 639 640
        }
    }

641
    debug!("%?", &mut pkts[ready_packet]);
E
Eric Holk 已提交
642

643 644 645 646 647
    for vec::each_mut(pkts) |p| {
        unsafe {
            (*p.header()).unblock()
        }
    }
E
Eric Holk 已提交
648

649
    debug!("%?, %?", ready_packet, &mut pkts[ready_packet]);
E
Eric Holk 已提交
650

651
    unsafe {
P
Patrick Walton 已提交
652
        assert!((*pkts[ready_packet].header()).state == Full
653
                     || (*pkts[ready_packet].header()).state == Terminated);
654
    }
E
Eric Holk 已提交
655 656 657 658

    ready_packet
}

659 660 661 662
/** The sending end of a pipe. It can be used to send exactly one
message.

*/
663
pub type SendPacket<T> = SendPacketBuffered<T, Packet<T>>;
664

665
pub fn SendPacket<T>(p: *mut Packet<T>) -> SendPacket<T> {
666 667
    SendPacketBuffered(p)
}
668

669
pub struct SendPacketBuffered<T, Tbuffer> {
670 671
    p: Option<*mut Packet<T>>,
    buffer: Option<BufferResource<Tbuffer>>,
672
}
673

674
#[unsafe_destructor]
675
impl<T:Owned,Tbuffer:Owned> Drop for SendPacketBuffered<T,Tbuffer> {
676
    fn finalize(&self) {
677 678 679
        unsafe {
            let this: &mut SendPacketBuffered<T,Tbuffer> = transmute(self);
            if this.p != None {
A
Alex Crichton 已提交
680
                let p = replace(&mut this.p, None);
681 682
                sender_terminate(p.unwrap())
            }
683 684 685 686
        }
    }
}

687 688
pub fn SendPacketBuffered<T,Tbuffer>(p: *mut Packet<T>)
                                     -> SendPacketBuffered<T,Tbuffer> {
689 690 691
    SendPacketBuffered {
        p: Some(p),
        buffer: unsafe {
692
            Some(BufferResource(get_buffer(&mut (*p).header)))
693 694 695 696
        }
    }
}

697 698
impl<T,Tbuffer> SendPacketBuffered<T,Tbuffer> {
    pub fn unwrap(&mut self) -> *mut Packet<T> {
A
Alex Crichton 已提交
699
        replace(&mut self.p, None).unwrap()
700
    }
701

702
    pub fn header(&mut self) -> *mut PacketHeader {
703
        match self.p {
704 705 706 707 708
            Some(packet) => unsafe {
                let packet = &mut *packet;
                let header = ptr::to_mut_unsafe_ptr(&mut packet.header);
                header
            },
709
            None => fail!("packet already consumed")
710 711
        }
    }
712

713
    pub fn reuse_buffer(&mut self) -> BufferResource<Tbuffer> {
P
Paul Stansifer 已提交
714
        //error!("send reuse_buffer");
A
Alex Crichton 已提交
715
        replace(&mut self.buffer, None).unwrap()
716
    }
717 718
}

E
Eric Holk 已提交
719 720
/// Represents the receive end of a pipe. It can receive exactly one
/// message.
721
pub type RecvPacket<T> = RecvPacketBuffered<T, Packet<T>>;
722

723
pub fn RecvPacket<T>(p: *mut Packet<T>) -> RecvPacket<T> {
724 725
    RecvPacketBuffered(p)
}
726

727
pub struct RecvPacketBuffered<T, Tbuffer> {
728 729
    p: Option<*mut Packet<T>>,
    buffer: Option<BufferResource<Tbuffer>>,
730 731
}

732
#[unsafe_destructor]
733
impl<T:Owned,Tbuffer:Owned> Drop for RecvPacketBuffered<T,Tbuffer> {
734
    fn finalize(&self) {
735 736 737
        unsafe {
            let this: &mut RecvPacketBuffered<T,Tbuffer> = transmute(self);
            if this.p != None {
A
Alex Crichton 已提交
738
                let p = replace(&mut this.p, None);
739 740
                receiver_terminate(p.unwrap())
            }
741 742 743
        }
    }
}
744

745 746
impl<T:Owned,Tbuffer:Owned> RecvPacketBuffered<T, Tbuffer> {
    pub fn unwrap(&mut self) -> *mut Packet<T> {
A
Alex Crichton 已提交
747
        replace(&mut self.p, None).unwrap()
748
    }
E
Eric Holk 已提交
749

750
    pub fn reuse_buffer(&mut self) -> BufferResource<Tbuffer> {
A
Alex Crichton 已提交
751
        replace(&mut self.buffer, None).unwrap()
752 753 754
    }
}

755
impl<T:Owned,Tbuffer:Owned> Selectable for RecvPacketBuffered<T, Tbuffer> {
756
    fn header(&mut self) -> *mut PacketHeader {
757
        match self.p {
758 759 760 761 762
            Some(packet) => unsafe {
                let packet = &mut *packet;
                let header = ptr::to_mut_unsafe_ptr(&mut packet.header);
                header
            },
763
            None => fail!("packet already consumed")
E
Eric Holk 已提交
764 765
        }
    }
766 767
}

768 769
pub fn RecvPacketBuffered<T,Tbuffer>(p: *mut Packet<T>)
                                     -> RecvPacketBuffered<T,Tbuffer> {
770 771 772
    RecvPacketBuffered {
        p: Some(p),
        buffer: unsafe {
773
            Some(BufferResource(get_buffer(&mut (*p).header)))
774 775 776
        }
    }
}
B
Brian Anderson 已提交
777

778
pub fn entangle<T>() -> (RecvPacket<T>, SendPacket<T>) {
779
    let p = packet();
780
    (RecvPacket(p), SendPacket(p))
781
}
782

B
Brian Anderson 已提交
783 784 785 786 787 788 789
/** Receives a message from one of two endpoints.

The return value is `left` if the first endpoint received something,
or `right` if the second endpoint receives something. In each case,
the result includes the other endpoint as well so it can be used
again. Below is an example of using `select2`.

790
~~~ {.rust}
B
Brian Anderson 已提交
791
match select2(a, b) {
792 793 794 795 796 797 798 799 800 801 802 803
    left((none, b)) {
        // endpoint a was closed.
    }
    right((a, none)) {
        // endpoint b was closed.
    }
    left((Some(_), b)) {
        // endpoint a received a message
    }
    right(a, Some(_)) {
        // endpoint b received a message.
    }
B
Brian Anderson 已提交
804 805 806 807 808 809 810 811
}
~~~

Sometimes messages will be available on both endpoints at once. In
this case, `select2` may return either `left` or `right`.

*/
pub fn select2<A:Owned,Ab:Owned,B:Owned,Bb:Owned>(
812 813
    mut a: RecvPacketBuffered<A, Ab>,
    mut b: RecvPacketBuffered<B, Bb>)
B
Brian Anderson 已提交
814
    -> Either<(Option<A>, RecvPacketBuffered<B, Bb>),
815 816 817
              (RecvPacketBuffered<A, Ab>, Option<B>)> {
    let mut endpoints = [ a.header(), b.header() ];
    let i = wait_many(endpoints);
B
Brian Anderson 已提交
818
    match i {
819 820
        0 => Left((try_recv(a), b)),
        1 => Right((a, try_recv(b))),
821
        _ => fail!("select2 return an invalid packet")
B
Brian Anderson 已提交
822 823 824 825
    }
}

pub trait Selectable {
826
    fn header(&mut self) -> *mut PacketHeader;
B
Brian Anderson 已提交
827 828
}

829 830
impl Selectable for *mut PacketHeader {
    fn header(&mut self) -> *mut PacketHeader { *self }
B
Brian Anderson 已提交
831 832 833
}

/// Returns the index of an endpoint that is ready to receive.
834
pub fn selecti<T:Selectable>(endpoints: &mut [T]) -> uint {
B
Brian Anderson 已提交
835 836 837 838
    wait_many(endpoints)
}

/// Returns 0 or 1 depending on which endpoint is ready to receive
839 840 841 842 843 844
pub fn select2i<A:Selectable,B:Selectable>(a: &mut A, b: &mut B)
                                           -> Either<(), ()> {
    let mut endpoints = [ a.header(), b.header() ];
    match wait_many(endpoints) {
        0 => Left(()),
        1 => Right(()),
845
        _ => fail!("wait returned unexpected index")
B
Brian Anderson 已提交
846 847 848
    }
}

849 850 851 852 853 854 855 856 857 858
/// Waits on a set of endpoints. Returns a message, its index, and a
/// list of the remaining endpoints.
pub fn select<T:Owned,Tb:Owned>(mut endpoints: ~[RecvPacketBuffered<T, Tb>])
                                -> (uint,
                                    Option<T>,
                                    ~[RecvPacketBuffered<T, Tb>]) {
    let mut endpoint_headers = ~[];
    for vec::each_mut(endpoints) |endpoint| {
        endpoint_headers.push(endpoint.header());
    }
B
Brian Anderson 已提交
859

860
    let ready = wait_many(endpoint_headers);
B
Brian Anderson 已提交
861 862 863 864 865 866
    let mut remaining = endpoints;
    let port = remaining.swap_remove(ready);
    let result = try_recv(port);
    (ready, result, remaining)
}

867
pub mod rt {
868 869
    use option::{None, Option, Some};

870 871
    // These are used to hide the option constructors from the
    // compiler because their names are changing
L
Luqman Aden 已提交
872
    pub fn make_some<T>(val: T) -> Option<T> { Some(val) }
873
    pub fn make_none<T>() -> Option<T> { None }
874 875
}

E
Eric Holk 已提交
876
#[cfg(test)]
877
mod test {
A
Alex Crichton 已提交
878
    use either::Right;
879
    use comm::{Chan, Port, oneshot, recv_one, stream, Select2,
A
Alex Crichton 已提交
880
               GenericChan, Peekable};
881

E
Eric Holk 已提交
882
    #[test]
883
    fn test_select2() {
884 885
        let (p1, c1) = stream();
        let (p2, c2) = stream();
E
Eric Holk 已提交
886

E
Eric Holk 已提交
887
        c1.send(~"abc");
E
Eric Holk 已提交
888

P
Patrick Walton 已提交
889 890 891 892
        let mut tuple = (p1, p2);
        match tuple.select() {
            Right(_) => fail!(),
            _ => (),
E
Eric Holk 已提交
893 894 895 896
        }

        c2.send(123);
    }
E
Eric Holk 已提交
897 898

    #[test]
899
    fn test_oneshot() {
P
Patrick Walton 已提交
900
        let (p, c) = oneshot();
E
Eric Holk 已提交
901

P
Patrick Walton 已提交
902
        c.send(());
E
Eric Holk 已提交
903

L
Luqman Aden 已提交
904
        recv_one(p)
E
Eric Holk 已提交
905
    }
906 907 908

    #[test]
    fn test_peek_terminated() {
909
        let (port, chan): (Port<int>, Chan<int>) = stream();
910 911 912

        {
            // Destroy the channel
L
Luqman Aden 已提交
913
            let _chan = chan;
914 915
        }

P
Patrick Walton 已提交
916
        assert!(!port.peek());
917
    }
E
Eric Holk 已提交
918
}