mod.rs 64.5 KB
Newer Older
C
Chris Wong 已提交
1
// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
A
Alex Crichton 已提交
2 3 4 5 6 7 8 9 10
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

A
Alex Crichton 已提交
11
//! Multi-producer, single-consumer communication primitives threads
A
Alex Crichton 已提交
12
//!
13
//! This module provides message-based communication over channels, concretely
14
//! defined among three types:
A
Alex Crichton 已提交
15
//!
16
//! * `Sender`
17
//! * `SyncSender`
18
//! * `Receiver`
A
Alex Crichton 已提交
19
//!
20
//! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
A
Alex Crichton 已提交
21 22
//! senders are clone-able (multi-producer) such that many threads can send
//! simultaneously to one receiver (single-consumer).
A
Alex Crichton 已提交
23
//!
A
Alex Crichton 已提交
24
//! These channels come in two flavors:
25 26 27 28 29 30 31 32 33 34 35 36 37
//!
//! 1. An asynchronous, infinitely buffered channel. The `channel()` function
//!    will return a `(Sender, Receiver)` tuple where all sends will be
//!    **asynchronous** (they never block). The channel conceptually has an
//!    infinite buffer.
//!
//! 2. A synchronous, bounded channel. The `sync_channel()` function will return
//!    a `(SyncSender, Receiver)` tuple where the storage for pending messages
//!    is a pre-allocated buffer of a fixed size. All sends will be
//!    **synchronous** by blocking until there is buffer space available. Note
//!    that a bound of 0 is allowed, causing the channel to become a
//!    "rendezvous" channel where each sender atomically hands off a message to
//!    a receiver.
A
Alex Crichton 已提交
38
//!
A
Alex Crichton 已提交
39
//! ## Disconnection
A
Alex Crichton 已提交
40
//!
A
Alex Crichton 已提交
41 42 43 44
//! The send and receive operations on channels will all return a `Result`
//! indicating whether the operation succeeded or not. An unsuccessful operation
//! is normally indicative of the other half of a channel having "hung up" by
//! being dropped in its corresponding thread.
A
Alex Crichton 已提交
45
//!
A
Alex Crichton 已提交
46 47 48 49
//! Once half of a channel has been deallocated, most operations can no longer
//! continue to make progress, so `Err` will be returned. Many applications will
//! continue to `unwrap()` the results returned from this module, instigating a
//! propagation of failure among threads if one unexpectedly dies.
A
Alex Crichton 已提交
50
//!
A
Alex Crichton 已提交
51
//! # Examples
A
Alex Crichton 已提交
52
//!
53 54 55
//! Simple usage:
//!
//! ```
56
//! use std::thread::Thread;
A
Alex Crichton 已提交
57
//! use std::sync::mpsc::channel;
58
//!
A
Alex Crichton 已提交
59
//! // Create a simple streaming channel
60
//! let (tx, rx) = channel();
61
//! Thread::spawn(move|| {
A
Alex Crichton 已提交
62
//!     tx.send(10i).unwrap();
A
Aaron Turon 已提交
63
//! });
A
Alex Crichton 已提交
64
//! assert_eq!(rx.recv().unwrap(), 10i);
65 66 67
//! ```
//!
//! Shared usage:
A
Alex Crichton 已提交
68
//!
69
//! ```
70
//! use std::thread::Thread;
A
Alex Crichton 已提交
71
//! use std::sync::mpsc::channel;
72 73
//!
//! // Create a shared channel that can be sent along from many threads
R
Robert Clipsham 已提交
74 75
//! // where tx is the sending half (tx for transmission), and rx is the receiving
//! // half (rx for receiving).
76
//! let (tx, rx) = channel();
77
//! for i in range(0i, 10i) {
78
//!     let tx = tx.clone();
79
//!     Thread::spawn(move|| {
A
Alex Crichton 已提交
80
//!         tx.send(i).unwrap();
A
Aaron Turon 已提交
81
//!     });
A
Alex Crichton 已提交
82 83
//! }
//!
84
//! for _ in range(0i, 10i) {
A
Alex Crichton 已提交
85
//!     let j = rx.recv().unwrap();
A
Alex Crichton 已提交
86 87
//!     assert!(0 <= j && j < 10);
//! }
88
//! ```
A
Alex Crichton 已提交
89
//!
S
Steve Klabnik 已提交
90
//! Propagating panics:
91
//!
A
Alex Crichton 已提交
92 93 94 95 96
//! ```
//! use std::sync::mpsc::channel;
//!
//! // The call to recv() will return an error because the channel has already
//! // hung up (or been deallocated)
97 98
//! let (tx, rx) = channel::<int>();
//! drop(tx);
A
Alex Crichton 已提交
99
//! assert!(rx.recv().is_err());
A
Alex Crichton 已提交
100
//! ```
101 102 103 104
//!
//! Synchronous channels:
//!
//! ```
105
//! use std::thread::Thread;
A
Alex Crichton 已提交
106
//! use std::sync::mpsc::sync_channel;
107
//!
108
//! let (tx, rx) = sync_channel::<int>(0);
109
//! Thread::spawn(move|| {
110
//!     // This will wait for the parent task to start receiving
A
Alex Crichton 已提交
111
//!     tx.send(53).unwrap();
A
Aaron Turon 已提交
112
//! });
A
Alex Crichton 已提交
113
//! rx.recv().unwrap();
114
//! ```
115 116 117 118 119 120 121
//!
//! Reading from a channel with a timeout requires to use a Timer together
//! with the channel. You can use the select! macro to select either and
//! handle the timeout case. This first example will break out of the loop
//! after 10 seconds no matter what:
//!
//! ```no_run
A
Alex Crichton 已提交
122
//! use std::sync::mpsc::channel;
123
//! use std::io::timer::Timer;
124
//! use std::time::Duration;
125 126 127
//!
//! let (tx, rx) = channel::<int>();
//! let mut timer = Timer::new().unwrap();
128
//! let timeout = timer.oneshot(Duration::seconds(10));
129 130 131
//!
//! loop {
//!     select! {
A
Alex Crichton 已提交
132 133
//!         val = rx.recv() => println!("Received {}", val.unwrap()),
//!         _ = timeout.recv() => {
134
//!             println!("timed out, total time was more than 10 seconds");
135 136 137 138 139 140 141 142 143 144 145
//!             break;
//!         }
//!     }
//! }
//! ```
//!
//! This second example is more costly since it allocates a new timer every
//! time a message is received, but it allows you to timeout after the channel
//! has been inactive for 5 seconds:
//!
//! ```no_run
A
Alex Crichton 已提交
146
//! use std::sync::mpsc::channel;
147
//! use std::io::timer::Timer;
148
//! use std::time::Duration;
149 150 151 152 153
//!
//! let (tx, rx) = channel::<int>();
//! let mut timer = Timer::new().unwrap();
//!
//! loop {
154
//!     let timeout = timer.oneshot(Duration::seconds(5));
155 156
//!
//!     select! {
A
Alex Crichton 已提交
157 158
//!         val = rx.recv() => println!("Received {}", val.unwrap()),
//!         _ = timeout.recv() => {
159
//!             println!("timed out, no message received in 5 seconds");
160 161 162 163 164
//!             break;
//!         }
//!     }
//! }
//! ```
A
Alex Crichton 已提交
165

166 167
#![stable]

A
Alex Crichton 已提交
168 169 170 171 172 173 174 175 176
// A description of how Rust's channel implementation works
//
// Channels are supposed to be the basic building block for all other
// concurrent primitives that are used in Rust. As a result, the channel type
// needs to be highly optimized, flexible, and broad enough for use everywhere.
//
// The choice of implementation of all channels is to be built on lock-free data
// structures. The channels themselves are then consequently also lock-free data
// structures. As always with lock-free code, this is a very "here be dragons"
177
// territory, especially because I'm unaware of any academic papers that have
A
Alex Crichton 已提交
178 179 180 181
// gone into great length about channels of these flavors.
//
// ## Flavors of channels
//
182 183 184 185 186
// From the perspective of a consumer of this library, there is only one flavor
// of channel. This channel can be used as a stream and cloned to allow multiple
// senders. Under the hood, however, there are actually three flavors of
// channels in play.
//
N
Nick Cameron 已提交
187
// * Flavor::Oneshots - these channels are highly optimized for the one-send use case.
188 189 190
//              They contain as few atomics as possible and involve one and
//              exactly one allocation.
// * Streams - these channels are optimized for the non-shared use case. They
191
//             use a different concurrent queue that is more tailored for this
192 193 194 195 196 197
//             use case. The initial allocation of this flavor of channel is not
//             optimized.
// * Shared - this is the most general form of channel that this module offers,
//            a channel with multiple senders. This type is as optimized as it
//            can be, but the previous two types mentioned are much faster for
//            their use-cases.
A
Alex Crichton 已提交
198 199 200
//
// ## Concurrent queues
//
201
// The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
A
Alex Crichton 已提交
202 203 204 205
// recv() obviously blocks. This means that under the hood there must be some
// shared and concurrent queue holding all of the actual data.
//
// With two flavors of channels, two flavors of queues are also used. We have
206
// chosen to use queues from a well-known author that are abbreviated as SPSC
A
Alex Crichton 已提交
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
// and MPSC (single producer, single consumer and multiple producer, single
// consumer). SPSC queues are used for streams while MPSC queues are used for
// shared channels.
//
// ### SPSC optimizations
//
// The SPSC queue found online is essentially a linked list of nodes where one
// half of the nodes are the "queue of data" and the other half of nodes are a
// cache of unused nodes. The unused nodes are used such that an allocation is
// not required on every push() and a free doesn't need to happen on every
// pop().
//
// As found online, however, the cache of nodes is of an infinite size. This
// means that if a channel at one point in its life had 50k items in the queue,
// then the queue will always have the capacity for 50k items. I believed that
// this was an unnecessary limitation of the implementation, so I have altered
// the queue to optionally have a bound on the cache size.
//
// By default, streams will have an unbounded SPSC queue with a small-ish cache
// size. The hope is that the cache is still large enough to have very fast
// send() operations while not too large such that millions of channels can
// coexist at once.
//
// ### MPSC optimizations
//
// Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
// a linked list under the hood to earn its unboundedness, but I have not put
// forth much effort into having a cache of nodes similar to the SPSC queue.
//
// For now, I believe that this is "ok" because shared channels are not the most
// common type, but soon we may wish to revisit this queue choice and determine
// another candidate for backend storage of shared channels.
//
// ## Overview of the Implementation
//
// Now that there's a little background on the concurrent queues used, it's
// worth going into much more detail about the channels themselves. The basic
// pseudocode for a send/recv are:
//
//
//      send(t)                             recv()
//        queue.push(t)                       return if queue.pop()
//        if increment() == -1                deschedule {
//          wakeup()                            if decrement() > 0
//                                                cancel_deschedule()
//                                            }
//                                            queue.pop()
//
// As mentioned before, there are no locks in this implementation, only atomic
// instructions are used.
//
// ### The internal atomic counter
//
260 261 262
// Every channel has a shared counter with each half to keep track of the size
// of the queue. This counter is used to abort descheduling by the receiver and
// to know when to wake up on the sending side.
A
Alex Crichton 已提交
263 264 265 266 267 268 269 270 271 272 273 274 275 276
//
// As seen in the pseudocode, senders will increment this count and receivers
// will decrement the count. The theory behind this is that if a sender sees a
// -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
// then it doesn't need to block.
//
// The recv() method has a beginning call to pop(), and if successful, it needs
// to decrement the count. It is a crucial implementation detail that this
// decrement does *not* happen to the shared counter. If this were the case,
// then it would be possible for the counter to be very negative when there were
// no receivers waiting, in which case the senders would have to determine when
// it was actually appropriate to wake up a receiver.
//
// Instead, the "steal count" is kept track of separately (not atomically
277
// because it's only used by receivers), and then the decrement() call when
A
Alex Crichton 已提交
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
// descheduling will lump in all of the recent steals into one large decrement.
//
// The implication of this is that if a sender sees a -1 count, then there's
// guaranteed to be a waiter waiting!
//
// ## Native Implementation
//
// A major goal of these channels is to work seamlessly on and off the runtime.
// All of the previous race conditions have been worded in terms of
// scheduler-isms (which is obviously not available without the runtime).
//
// For now, native usage of channels (off the runtime) will fall back onto
// mutexes/cond vars for descheduling/atomic decisions. The no-contention path
// is still entirely lock-free, the "deschedule" blocks above are surrounded by
// a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
// condition variable.
//
// ## Select
//
// Being able to support selection over channels has greatly influenced this
// design, and not only does selection need to work inside the runtime, but also
// outside the runtime.
//
// The implementation is fairly straightforward. The goal of select() is not to
// return some data, but only to return which channel can receive data without
// blocking. The implementation is essentially the entire blocking procedure
// followed by an increment as soon as its woken up. The cancellation procedure
// involves an increment and swapping out of to_wake to acquire ownership of the
// task to unblock.
//
// Sadly this current implementation requires multiple allocations, so I have
// seen the throughput of select() be much worse than it should be. I do not
310
// believe that there is anything fundamental that needs to change about these
A
Alex Crichton 已提交
311 312 313 314 315 316 317
// channels, however, in order to support a more efficient select().
//
// # Conclusion
//
// And now that you've seen all the races that I found and attempted to fix,
// here's the code for you to find some more!

A
Alex Crichton 已提交
318
use prelude::v1::*;
319

A
Alex Crichton 已提交
320 321
use sync::Arc;
use fmt;
N
Nick Cameron 已提交
322
use marker;
A
Alex Crichton 已提交
323 324
use mem;
use cell::UnsafeCell;
A
Alex Crichton 已提交
325

A
Aaron Turon 已提交
326 327 328 329
pub use self::select::{Select, Handle};
use self::select::StartResult;
use self::select::StartResult::*;
use self::blocking::SignalToken;
A
Alex Crichton 已提交
330

331
mod blocking;
332
mod oneshot;
A
Alex Crichton 已提交
333
mod select;
334
mod shared;
A
Alex Crichton 已提交
335
mod stream;
336
mod sync;
A
Alex Crichton 已提交
337 338
mod mpsc_queue;
mod spsc_queue;
A
Alex Crichton 已提交
339 340 341

/// The receiving-half of Rust's channel type. This half can only be owned by
/// one task
A
Alex Crichton 已提交
342
#[stable]
343
pub struct Receiver<T> {
344
    inner: UnsafeCell<Flavor<T>>,
A
Alex Crichton 已提交
345 346
}

F
Flavio Percoco 已提交
347 348
// The receiver port can be sent from place to place, so long as it
// is not used to receive non-sendable things.
F
Flavio Percoco 已提交
349
unsafe impl<T:Send> Send for Receiver<T> { }
F
Flavio Percoco 已提交
350

351
/// An iterator over messages on a receiver, this iterator will block
A
Alex Crichton 已提交
352 353
/// whenever `next` is called, waiting for a new message, and `None` will be
/// returned when the corresponding channel has hung up.
A
Alex Crichton 已提交
354 355
#[stable]
pub struct Iter<'a, T:'a> {
356 357 358
    rx: &'a Receiver<T>
}

359 360
/// The sending-half of Rust's asynchronous channel type. This half can only be
/// owned by one task, but it can be cloned to send to other tasks.
A
Alex Crichton 已提交
361
#[stable]
362
pub struct Sender<T> {
363
    inner: UnsafeCell<Flavor<T>>,
A
Alex Crichton 已提交
364 365
}

F
Flavio Percoco 已提交
366 367
// The send port can be sent from place to place, so long as it
// is not used to send non-sendable things.
F
Flavio Percoco 已提交
368
unsafe impl<T:Send> Send for Sender<T> { }
F
Flavio Percoco 已提交
369

370 371
/// The sending-half of Rust's synchronous channel type. This half can only be
/// owned by one task, but it can be cloned to send to other tasks.
A
Alex Crichton 已提交
372
#[stable]
373
pub struct SyncSender<T> {
F
Flavio Percoco 已提交
374
    inner: Arc<RacyCell<sync::Packet<T>>>,
375 376
}

377 378
impl<T> !marker::Sync for SyncSender<T> {}

A
Alex Crichton 已提交
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395
/// An error returned from the `send` function on channels.
///
/// A `send` operation can only fail if the receiving end of a channel is
/// disconnected, implying that the data could never be received. The error
/// contains the data being sent as a payload so it can be recovered.
#[derive(PartialEq, Eq)]
#[stable]
pub struct SendError<T>(pub T);

/// An error returned from the `recv` function on a `Receiver`.
///
/// The `recv` operation can only fail if the sending half of a channel is
/// disconnected, implying that no further messages will ever be received.
#[derive(PartialEq, Eq, Clone, Copy)]
#[stable]
pub struct RecvError;

396 397
/// This enumeration is the list of the possible reasons that try_recv could not
/// return data when called.
A
Alex Crichton 已提交
398 399
#[derive(PartialEq, Clone, Copy)]
#[stable]
400
pub enum TryRecvError {
401 402
    /// This channel is currently empty, but the sender(s) have not yet
    /// disconnected, so data may yet become available.
A
Alex Crichton 已提交
403
    #[stable]
404
    Empty,
A
Alex Crichton 已提交
405

406 407
    /// This channel's sending half has become disconnected, and there will
    /// never be any more data received on this channel
A
Alex Crichton 已提交
408
    #[stable]
409 410 411
    Disconnected,
}

412
/// This enumeration is the list of the possible error outcomes for the
413
/// `SyncSender::try_send` method.
A
Alex Crichton 已提交
414 415
#[derive(PartialEq, Clone)]
#[stable]
416
pub enum TrySendError<T> {
417 418 419 420 421 422
    /// The data could not be sent on the channel because it would require that
    /// the callee block to send the data.
    ///
    /// If this is a buffered channel, then the buffer is full at this time. If
    /// this is not a buffered channel, then there is no receiver available to
    /// acquire the data.
A
Alex Crichton 已提交
423
    #[stable]
424
    Full(T),
A
Alex Crichton 已提交
425

426 427
    /// This channel's receiving half has disconnected, so the data could not be
    /// sent. The data is returned back to the callee in this case.
A
Alex Crichton 已提交
428 429
    #[stable]
    Disconnected(T),
430 431
}

432
enum Flavor<T> {
F
Flavio Percoco 已提交
433 434 435 436
    Oneshot(Arc<RacyCell<oneshot::Packet<T>>>),
    Stream(Arc<RacyCell<stream::Packet<T>>>),
    Shared(Arc<RacyCell<shared::Packet<T>>>),
    Sync(Arc<RacyCell<sync::Packet<T>>>),
A
Alex Crichton 已提交
437 438
}

439 440
#[doc(hidden)]
trait UnsafeFlavor<T> {
441
    fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>>;
A
Aaron Turon 已提交
442
    unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor<T> {
443 444 445 446 447 448 449
        &mut *self.inner_unsafe().get()
    }
    unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> {
        &*self.inner_unsafe().get()
    }
}
impl<T> UnsafeFlavor<T> for Sender<T> {
450
    fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
451 452 453 454
        &self.inner
    }
}
impl<T> UnsafeFlavor<T> for Receiver<T> {
455
    fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> {
456 457 458 459
        &self.inner
    }
}

460 461 462 463 464 465 466 467
/// Creates a new asynchronous channel, returning the sender/receiver halves.
///
/// All data sent on the sender will become available on the receiver, and no
/// send will block the calling task (this channel has an "infinite buffer").
///
/// # Example
///
/// ```
A
Alex Crichton 已提交
468
/// use std::sync::mpsc::channel;
469 470
/// use std::thread::Thread;
///
R
Robert Clipsham 已提交
471 472
/// // tx is is the sending half (tx for transmission), and rx is the receiving
/// // half (rx for receiving).
473 474 475
/// let (tx, rx) = channel();
///
/// // Spawn off an expensive computation
476
/// Thread::spawn(move|| {
477
/// #   fn expensive_computation() {}
A
Alex Crichton 已提交
478
///     tx.send(expensive_computation()).unwrap();
A
Aaron Turon 已提交
479
/// });
480 481 482 483
///
/// // Do some useful work for awhile
///
/// // Let's see what that answer was
A
Alex Crichton 已提交
484
/// println!("{:?}", rx.recv().unwrap());
485
/// ```
A
Alex Crichton 已提交
486
#[stable]
487
pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
F
Flavio Percoco 已提交
488
    let a = Arc::new(RacyCell::new(oneshot::Packet::new()));
N
Nick Cameron 已提交
489
    (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
490
}
491

492 493 494 495 496 497 498 499 500 501 502 503
/// Creates a new synchronous, bounded channel.
///
/// Like asynchronous channels, the `Receiver` will block until a message
/// becomes available. These channels differ greatly in the semantics of the
/// sender from asynchronous channels, however.
///
/// This channel has an internal buffer on which messages will be queued. When
/// the internal buffer becomes full, future sends will *block* waiting for the
/// buffer to open up. Note that a buffer size of 0 is valid, in which case this
/// becomes  "rendezvous channel" where each send will not return until a recv
/// is paired with it.
///
S
Steve Klabnik 已提交
504
/// As with asynchronous channels, all senders will panic in `send` if the
505 506 507 508 509
/// `Receiver` has been destroyed.
///
/// # Example
///
/// ```
A
Alex Crichton 已提交
510
/// use std::sync::mpsc::sync_channel;
511 512
/// use std::thread::Thread;
///
513 514 515
/// let (tx, rx) = sync_channel(1);
///
/// // this returns immediately
A
Alex Crichton 已提交
516
/// tx.send(1i).unwrap();
517
///
518
/// Thread::spawn(move|| {
519
///     // this will block until the previous message has been received
A
Alex Crichton 已提交
520
///     tx.send(2i).unwrap();
A
Aaron Turon 已提交
521
/// });
522
///
A
Alex Crichton 已提交
523 524
/// assert_eq!(rx.recv().unwrap(), 1i);
/// assert_eq!(rx.recv().unwrap(), 2i);
525
/// ```
A
Alex Crichton 已提交
526
#[stable]
527
pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
F
Flavio Percoco 已提交
528
    let a = Arc::new(RacyCell::new(sync::Packet::new(bound)));
N
Nick Cameron 已提交
529
    (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
530 531 532 533 534 535
}

////////////////////////////////////////////////////////////////////////////////
// Sender
////////////////////////////////////////////////////////////////////////////////

536
impl<T: Send> Sender<T> {
537
    fn new(inner: Flavor<T>) -> Sender<T> {
538 539 540
        Sender {
            inner: UnsafeCell::new(inner),
        }
A
Alex Crichton 已提交
541 542
    }

543 544
    /// Attempts to send a value on this channel, returning it back if it could
    /// not be sent.
A
Alex Crichton 已提交
545
    ///
546 547 548
    /// A successful send occurs when it is determined that the other end of
    /// the channel has not hung up already. An unsuccessful send would be one
    /// where the corresponding receiver has already been deallocated. Note
549 550
    /// that a return value of `Err` means that the data will never be
    /// received, but a return value of `Ok` does *not* mean that the data
551
    /// will be received.  It is possible for the corresponding receiver to
552
    /// hang up immediately after this function returns `Ok`.
A
Alex Crichton 已提交
553
    ///
A
Alex Crichton 已提交
554
    /// This method will never block the current thread.
555 556 557 558
    ///
    /// # Example
    ///
    /// ```
A
Alex Crichton 已提交
559 560
    /// use std::sync::mpsc::channel;
    ///
561 562 563
    /// let (tx, rx) = channel();
    ///
    /// // This send is always successful
A
Alex Crichton 已提交
564
    /// tx.send(1i).unwrap();
565 566 567
    ///
    /// // This send will fail because the receiver is gone
    /// drop(rx);
A
Alex Crichton 已提交
568
    /// assert_eq!(tx.send(1i).err().unwrap().0, 1);
569
    /// ```
570
    #[stable]
571
    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
572
        let (new_inner, ret) = match *unsafe { self.inner() } {
N
Nick Cameron 已提交
573
            Flavor::Oneshot(ref p) => {
574
                unsafe {
575
                    let p = p.get();
576
                    if !(*p).sent() {
A
Alex Crichton 已提交
577
                        return (*p).send(t).map_err(SendError);
578
                    } else {
F
Flavio Percoco 已提交
579 580
                        let a =
                            Arc::new(RacyCell::new(stream::Packet::new()));
A
Alex Crichton 已提交
581 582
                        let rx = Receiver::new(Flavor::Stream(a.clone()));
                        match (*p).upgrade(rx) {
583
                            oneshot::UpSuccess => {
584 585
                                let ret = (*a.get()).send(t);
                                (a, ret)
586
                            }
587
                            oneshot::UpDisconnected => (a, Err(t)),
A
Aaron Turon 已提交
588 589
                            oneshot::UpWoke(token) => {
                                // This send cannot panic because the thread is
590 591
                                // asleep (we're looking at it), so the receiver
                                // can't go away.
592
                                (*a.get()).send(t).ok().unwrap();
A
Alex Crichton 已提交
593
                        token.signal();
594
                                (a, Ok(()))
595 596
                            }
                        }
A
Alex Crichton 已提交
597 598 599
                    }
                }
            }
A
Alex Crichton 已提交
600 601 602 603 604 605
            Flavor::Stream(ref p) => return unsafe {
                (*p.get()).send(t).map_err(SendError)
            },
            Flavor::Shared(ref p) => return unsafe {
                (*p.get()).send(t).map_err(SendError)
            },
N
Nick Cameron 已提交
606
            Flavor::Sync(..) => unreachable!(),
607
        };
A
Alex Crichton 已提交
608

609
        unsafe {
N
Nick Cameron 已提交
610
            let tmp = Sender::new(Flavor::Stream(new_inner));
A
Aaron Turon 已提交
611
            mem::swap(self.inner_mut(), tmp.inner_mut());
A
Alex Crichton 已提交
612
        }
A
Alex Crichton 已提交
613
        ret.map_err(SendError)
A
Alex Crichton 已提交
614
    }
615
}
A
Alex Crichton 已提交
616

A
Aaron Turon 已提交
617
#[stable]
618 619
impl<T: Send> Clone for Sender<T> {
    fn clone(&self) -> Sender<T> {
A
Alex Crichton 已提交
620
        let (packet, sleeper, guard) = match *unsafe { self.inner() } {
N
Nick Cameron 已提交
621
            Flavor::Oneshot(ref p) => {
F
Flavio Percoco 已提交
622
                let a = Arc::new(RacyCell::new(shared::Packet::new()));
623
                unsafe {
A
Alex Crichton 已提交
624
                    let guard = (*a.get()).postinit_lock();
A
Alex Crichton 已提交
625 626
                    let rx = Receiver::new(Flavor::Shared(a.clone()));
                    match (*p.get()).upgrade(rx) {
A
Alex Crichton 已提交
627 628 629
                        oneshot::UpSuccess |
                        oneshot::UpDisconnected => (a, None, guard),
                        oneshot::UpWoke(task) => (a, Some(task), guard)
630
                    }
631
                }
A
Alex Crichton 已提交
632
            }
N
Nick Cameron 已提交
633
            Flavor::Stream(ref p) => {
F
Flavio Percoco 已提交
634
                let a = Arc::new(RacyCell::new(shared::Packet::new()));
635
                unsafe {
A
Alex Crichton 已提交
636
                    let guard = (*a.get()).postinit_lock();
A
Alex Crichton 已提交
637 638
                    let rx = Receiver::new(Flavor::Shared(a.clone()));
                    match (*p.get()).upgrade(rx) {
A
Alex Crichton 已提交
639 640 641
                        stream::UpSuccess |
                        stream::UpDisconnected => (a, None, guard),
                        stream::UpWoke(task) => (a, Some(task), guard),
642
                    }
A
Alex Crichton 已提交
643 644
                }
            }
N
Nick Cameron 已提交
645
            Flavor::Shared(ref p) => {
646
                unsafe { (*p.get()).clone_chan(); }
N
Nick Cameron 已提交
647
                return Sender::new(Flavor::Shared(p.clone()));
648
            }
N
Nick Cameron 已提交
649
            Flavor::Sync(..) => unreachable!(),
650 651 652
        };

        unsafe {
A
Alex Crichton 已提交
653
            (*packet.get()).inherit_blocker(sleeper, guard);
A
Alex Crichton 已提交
654

N
Nick Cameron 已提交
655
            let tmp = Sender::new(Flavor::Shared(packet.clone()));
A
Aaron Turon 已提交
656
            mem::swap(self.inner_mut(), tmp.inner_mut());
657
        }
N
Nick Cameron 已提交
658
        Sender::new(Flavor::Shared(packet))
A
Alex Crichton 已提交
659 660 661 662
    }
}

#[unsafe_destructor]
663
#[stable]
664
impl<T: Send> Drop for Sender<T> {
A
Alex Crichton 已提交
665
    fn drop(&mut self) {
A
Aaron Turon 已提交
666
        match *unsafe { self.inner_mut() } {
N
Nick Cameron 已提交
667 668 669 670
            Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
            Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
            Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
            Flavor::Sync(..) => unreachable!(),
671
        }
A
Alex Crichton 已提交
672 673 674
    }
}

675 676 677 678 679 680
////////////////////////////////////////////////////////////////////////////////
// SyncSender
////////////////////////////////////////////////////////////////////////////////

impl<T: Send> SyncSender<T> {

681 682 683 684
    fn new(inner: Arc<RacyCell<sync::Packet<T>>>) -> SyncSender<T> {
        SyncSender { inner: inner }
    }

685 686 687 688 689 690
    /// Sends a value on this synchronous channel.
    ///
    /// This function will *block* until space in the internal buffer becomes
    /// available or a receiver is available to hand off the message to.
    ///
    /// Note that a successful send does *not* guarantee that the receiver will
A
Alex Crichton 已提交
691
    /// ever see the data if there is a buffer on this channel. Items may be
692 693 694 695
    /// enqueued in the internal buffer for the receiver to receive at a later
    /// time. If the buffer size is 0, however, it can be guaranteed that the
    /// receiver has indeed received the data if this function returns success.
    ///
A
Alex Crichton 已提交
696 697 698 699 700 701
    /// This function will never panic, but it may return `Err` if the
    /// `Receiver` has disconnected and is no longer able to receive
    /// information.
    #[stable]
    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
        unsafe { (*self.inner.get()).send(t).map_err(SendError) }
702 703 704 705
    }

    /// Attempts to send a value on this channel without blocking.
    ///
A
Alex Crichton 已提交
706
    /// This method differs from `send` by returning immediately if the
707
    /// channel's buffer is full or no receiver is waiting to acquire some
A
Alex Crichton 已提交
708
    /// data. Compared with `send`, this function has two failure cases
709
    /// instead of one (one for disconnection, one for a full buffer).
710 711 712
    ///
    /// See `SyncSender::send` for notes about guarantees of whether the
    /// receiver has received the data or not if this function is successful.
A
Alex Crichton 已提交
713
    #[stable]
714
    pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
715 716 717 718
        unsafe { (*self.inner.get()).try_send(t) }
    }
}

A
Aaron Turon 已提交
719
#[stable]
720 721 722 723 724 725 726 727
impl<T: Send> Clone for SyncSender<T> {
    fn clone(&self) -> SyncSender<T> {
        unsafe { (*self.inner.get()).clone_chan(); }
        return SyncSender::new(self.inner.clone());
    }
}

#[unsafe_destructor]
728
#[stable]
729 730 731 732 733 734 735 736 737 738
impl<T: Send> Drop for SyncSender<T> {
    fn drop(&mut self) {
        unsafe { (*self.inner.get()).drop_chan(); }
    }
}

////////////////////////////////////////////////////////////////////////////////
// Receiver
////////////////////////////////////////////////////////////////////////////////

739
impl<T: Send> Receiver<T> {
740
    fn new(inner: Flavor<T>) -> Receiver<T> {
F
Flavio Percoco 已提交
741
        Receiver { inner: UnsafeCell::new(inner) }
742 743
    }

744
    /// Attempts to return a pending value on this receiver without blocking
A
Alex Crichton 已提交
745 746 747 748 749 750
    ///
    /// This method will never block the caller in order to wait for data to
    /// become available. Instead, this will always return immediately with a
    /// possible option of pending data on the channel.
    ///
    /// This is useful for a flavor of "optimistic check" before deciding to
751
    /// block on a receiver.
A
Alex Crichton 已提交
752
    #[stable]
753
    pub fn try_recv(&self) -> Result<T, TryRecvError> {
754
        loop {
755
            let new_port = match *unsafe { self.inner() } {
N
Nick Cameron 已提交
756
                Flavor::Oneshot(ref p) => {
757
                    match unsafe { (*p.get()).try_recv() } {
758
                        Ok(t) => return Ok(t),
A
Alex Crichton 已提交
759 760 761 762
                        Err(oneshot::Empty) => return Err(TryRecvError::Empty),
                        Err(oneshot::Disconnected) => {
                            return Err(TryRecvError::Disconnected)
                        }
763
                        Err(oneshot::Upgraded(rx)) => rx,
A
Alex Crichton 已提交
764 765
                    }
                }
N
Nick Cameron 已提交
766
                Flavor::Stream(ref p) => {
767
                    match unsafe { (*p.get()).try_recv() } {
768
                        Ok(t) => return Ok(t),
A
Alex Crichton 已提交
769 770 771 772
                        Err(stream::Empty) => return Err(TryRecvError::Empty),
                        Err(stream::Disconnected) => {
                            return Err(TryRecvError::Disconnected)
                        }
773
                        Err(stream::Upgraded(rx)) => rx,
774 775
                    }
                }
N
Nick Cameron 已提交
776
                Flavor::Shared(ref p) => {
777
                    match unsafe { (*p.get()).try_recv() } {
778
                        Ok(t) => return Ok(t),
A
Alex Crichton 已提交
779 780 781 782
                        Err(shared::Empty) => return Err(TryRecvError::Empty),
                        Err(shared::Disconnected) => {
                            return Err(TryRecvError::Disconnected)
                        }
783 784
                    }
                }
N
Nick Cameron 已提交
785
                Flavor::Sync(ref p) => {
786
                    match unsafe { (*p.get()).try_recv() } {
787
                        Ok(t) => return Ok(t),
A
Alex Crichton 已提交
788 789 790 791
                        Err(sync::Empty) => return Err(TryRecvError::Empty),
                        Err(sync::Disconnected) => {
                            return Err(TryRecvError::Disconnected)
                        }
792 793
                    }
                }
794 795
            };
            unsafe {
A
Aaron Turon 已提交
796 797
                mem::swap(self.inner_mut(),
                          new_port.inner_mut());
798 799
            }
        }
A
Alex Crichton 已提交
800 801
    }

A
Alex Crichton 已提交
802
    /// Attempt to wait for a value on this receiver, returning an error if the
A
Alex Crichton 已提交
803 804
    /// corresponding channel has hung up.
    ///
A
Alex Crichton 已提交
805 806 807 808
    /// This function will always block the current thread if there is no data
    /// available and it's possible for more data to be sent. Once a message is
    /// sent to the corresponding `Sender`, then this receiver will wake up and
    /// return that message.
A
Alex Crichton 已提交
809
    ///
A
Alex Crichton 已提交
810 811 812 813 814
    /// If the corresponding `Sender` has disconnected, or it disconnects while
    /// this call is blocking, this call will wake up and return `Err` to
    /// indicate that no more messages can ever be received on this channel.
    #[stable]
    pub fn recv(&self) -> Result<T, RecvError> {
815
        loop {
816
            let new_port = match *unsafe { self.inner() } {
N
Nick Cameron 已提交
817
                Flavor::Oneshot(ref p) => {
818
                    match unsafe { (*p.get()).recv() } {
819
                        Ok(t) => return Ok(t),
820
                        Err(oneshot::Empty) => return unreachable!(),
A
Alex Crichton 已提交
821
                        Err(oneshot::Disconnected) => return Err(RecvError),
822
                        Err(oneshot::Upgraded(rx)) => rx,
823
                    }
824
                }
N
Nick Cameron 已提交
825
                Flavor::Stream(ref p) => {
826
                    match unsafe { (*p.get()).recv() } {
827
                        Ok(t) => return Ok(t),
828
                        Err(stream::Empty) => return unreachable!(),
A
Alex Crichton 已提交
829
                        Err(stream::Disconnected) => return Err(RecvError),
830
                        Err(stream::Upgraded(rx)) => rx,
831 832
                    }
                }
N
Nick Cameron 已提交
833
                Flavor::Shared(ref p) => {
834
                    match unsafe { (*p.get()).recv() } {
835
                        Ok(t) => return Ok(t),
836
                        Err(shared::Empty) => return unreachable!(),
A
Alex Crichton 已提交
837
                        Err(shared::Disconnected) => return Err(RecvError),
838 839
                    }
                }
A
Alex Crichton 已提交
840 841 842
                Flavor::Sync(ref p) => return unsafe {
                    (*p.get()).recv().map_err(|()| RecvError)
                }
843 844
            };
            unsafe {
A
Aaron Turon 已提交
845
                mem::swap(self.inner_mut(), new_port.inner_mut());
846
            }
A
Alex Crichton 已提交
847 848 849
        }
    }

850
    /// Returns an iterator that will block waiting for messages, but never
S
Steve Klabnik 已提交
851
    /// `panic!`. It will return `None` when the channel has hung up.
A
Alex Crichton 已提交
852 853 854
    #[stable]
    pub fn iter(&self) -> Iter<T> {
        Iter { rx: self }
A
Alex Crichton 已提交
855 856 857
    }
}

858
impl<T: Send> select::Packet for Receiver<T> {
859 860
    fn can_recv(&self) -> bool {
        loop {
861
            let new_port = match *unsafe { self.inner() } {
N
Nick Cameron 已提交
862
                Flavor::Oneshot(ref p) => {
863 864 865 866 867
                    match unsafe { (*p.get()).can_recv() } {
                        Ok(ret) => return ret,
                        Err(upgrade) => upgrade,
                    }
                }
N
Nick Cameron 已提交
868
                Flavor::Stream(ref p) => {
869 870 871 872 873
                    match unsafe { (*p.get()).can_recv() } {
                        Ok(ret) => return ret,
                        Err(upgrade) => upgrade,
                    }
                }
N
Nick Cameron 已提交
874
                Flavor::Shared(ref p) => {
875 876
                    return unsafe { (*p.get()).can_recv() };
                }
N
Nick Cameron 已提交
877
                Flavor::Sync(ref p) => {
878 879
                    return unsafe { (*p.get()).can_recv() };
                }
880 881
            };
            unsafe {
A
Aaron Turon 已提交
882 883
                mem::swap(self.inner_mut(),
                          new_port.inner_mut());
884 885 886 887
            }
        }
    }

A
Aaron Turon 已提交
888
    fn start_selection(&self, mut token: SignalToken) -> StartResult {
889
        loop {
890
            let (t, new_port) = match *unsafe { self.inner() } {
N
Nick Cameron 已提交
891
                Flavor::Oneshot(ref p) => {
892 893 894
                    match unsafe { (*p.get()).start_selection(token) } {
                        oneshot::SelSuccess => return Installed,
                        oneshot::SelCanceled => return Abort,
895
                        oneshot::SelUpgraded(t, rx) => (t, rx),
896 897
                    }
                }
N
Nick Cameron 已提交
898
                Flavor::Stream(ref p) => {
899 900 901
                    match unsafe { (*p.get()).start_selection(token) } {
                        stream::SelSuccess => return Installed,
                        stream::SelCanceled => return Abort,
902
                        stream::SelUpgraded(t, rx) => (t, rx),
903 904
                    }
                }
N
Nick Cameron 已提交
905
                Flavor::Shared(ref p) => {
906
                    return unsafe { (*p.get()).start_selection(token) };
907
                }
N
Nick Cameron 已提交
908
                Flavor::Sync(ref p) => {
909
                    return unsafe { (*p.get()).start_selection(token) };
910
                }
911
            };
912
            token = t;
913
            unsafe {
914
                mem::swap(self.inner_mut(), new_port.inner_mut());
915 916 917 918 919 920 921
            }
        }
    }

    fn abort_selection(&self) -> bool {
        let mut was_upgrade = false;
        loop {
922
            let result = match *unsafe { self.inner() } {
N
Nick Cameron 已提交
923 924
                Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
                Flavor::Stream(ref p) => unsafe {
925 926
                    (*p.get()).abort_selection(was_upgrade)
                },
N
Nick Cameron 已提交
927
                Flavor::Shared(ref p) => return unsafe {
928 929
                    (*p.get()).abort_selection(was_upgrade)
                },
N
Nick Cameron 已提交
930
                Flavor::Sync(ref p) => return unsafe {
931 932
                    (*p.get()).abort_selection()
                },
933
            };
934
            let new_port = match result { Ok(b) => return b, Err(p) => p };
935 936
            was_upgrade = true;
            unsafe {
A
Aaron Turon 已提交
937 938
                mem::swap(self.inner_mut(),
                          new_port.inner_mut());
939 940 941 942 943
            }
        }
    }
}

944
#[stable]
J
Jorge Aparicio 已提交
945 946 947
impl<'a, T: Send> Iterator for Iter<'a, T> {
    type Item = T;

948
    fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
A
Alex Crichton 已提交
949 950 951
}

#[unsafe_destructor]
952
#[stable]
953
impl<T: Send> Drop for Receiver<T> {
A
Alex Crichton 已提交
954
    fn drop(&mut self) {
A
Aaron Turon 已提交
955
        match *unsafe { self.inner_mut() } {
N
Nick Cameron 已提交
956 957 958 959
            Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
            Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
            Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
            Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
960
        }
A
Alex Crichton 已提交
961 962 963
    }
}

F
Flavio Percoco 已提交
964 965
/// A version of `UnsafeCell` intended for use in concurrent data
/// structures (for example, you might put it in an `Arc`).
966
struct RacyCell<T>(pub UnsafeCell<T>);
F
Flavio Percoco 已提交
967 968

impl<T> RacyCell<T> {
969 970

    fn new(value: T) -> RacyCell<T> {
F
Flavio Percoco 已提交
971 972 973
        RacyCell(UnsafeCell { value: value })
    }

974
    unsafe fn get(&self) -> *mut T {
F
Flavio Percoco 已提交
975 976 977 978 979 980 981
        self.0.get()
    }

}

unsafe impl<T:Send> Send for RacyCell<T> { }

A
Alex Crichton 已提交
982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007
unsafe impl<T> Sync for RacyCell<T> { } // Oh dear

impl<T> fmt::Show for SendError<T> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        "sending on a closed channel".fmt(f)
    }
}

impl<T> fmt::Show for TrySendError<T> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match *self {
            TrySendError::Full(..) => {
                "sending on a full channel".fmt(f)
            }
            TrySendError::Disconnected(..) => {
                "sending on a closed channel".fmt(f)
            }
        }
    }
}

impl fmt::Show for RecvError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        "receiving on a closed channel".fmt(f)
    }
}
F
Flavio Percoco 已提交
1008

A
Alex Crichton 已提交
1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020
impl fmt::Show for TryRecvError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match *self {
            TryRecvError::Empty => {
                "receiving on an empty channel".fmt(f)
            }
            TryRecvError::Disconnected => {
                "receiving on a closed channel".fmt(f)
            }
        }
    }
}
F
Flavio Percoco 已提交
1021

A
Alex Crichton 已提交
1022 1023
#[cfg(test)]
mod test {
A
Alex Crichton 已提交
1024 1025
    use prelude::v1::*;

S
Steve Klabnik 已提交
1026
    use os;
A
Alex Crichton 已提交
1027 1028
    use super::*;
    use thread::Thread;
A
Alex Crichton 已提交
1029 1030 1031

    pub fn stress_factor() -> uint {
        match os::getenv("RUST_TEST_STRESS") {
A
Alex Crichton 已提交
1032
            Some(val) => val.parse().unwrap(),
A
Alex Crichton 已提交
1033 1034 1035
            None => 1,
        }
    }
A
Alex Crichton 已提交
1036

A
Alex Crichton 已提交
1037 1038
    #[test]
    fn smoke() {
1039
        let (tx, rx) = channel::<int>();
A
Alex Crichton 已提交
1040 1041 1042
        tx.send(1).unwrap();
        assert_eq!(rx.recv().unwrap(), 1);
    }
A
Alex Crichton 已提交
1043

A
Alex Crichton 已提交
1044 1045
    #[test]
    fn drop_full() {
1046
        let (tx, _rx) = channel();
A
Alex Crichton 已提交
1047 1048
        tx.send(box 1i).unwrap();
    }
A
Alex Crichton 已提交
1049

A
Alex Crichton 已提交
1050 1051
    #[test]
    fn drop_full_shared() {
1052 1053 1054
        let (tx, _rx) = channel();
        drop(tx.clone());
        drop(tx.clone());
A
Alex Crichton 已提交
1055 1056
        tx.send(box 1i).unwrap();
    }
A
Alex Crichton 已提交
1057

A
Alex Crichton 已提交
1058 1059
    #[test]
    fn smoke_shared() {
1060
        let (tx, rx) = channel::<int>();
A
Alex Crichton 已提交
1061 1062
        tx.send(1).unwrap();
        assert_eq!(rx.recv().unwrap(), 1);
1063
        let tx = tx.clone();
A
Alex Crichton 已提交
1064 1065 1066
        tx.send(1).unwrap();
        assert_eq!(rx.recv().unwrap(), 1);
    }
A
Alex Crichton 已提交
1067

A
Alex Crichton 已提交
1068 1069
    #[test]
    fn smoke_threads() {
1070
        let (tx, rx) = channel::<int>();
A
Alex Crichton 已提交
1071 1072
        let _t = Thread::spawn(move|| {
            tx.send(1).unwrap();
1073
        });
A
Alex Crichton 已提交
1074 1075
        assert_eq!(rx.recv().unwrap(), 1);
    }
A
Alex Crichton 已提交
1076

A
Alex Crichton 已提交
1077 1078
    #[test]
    fn smoke_port_gone() {
1079
        let (tx, rx) = channel::<int>();
1080
        drop(rx);
A
Alex Crichton 已提交
1081 1082
        assert!(tx.send(1).is_err());
    }
A
Alex Crichton 已提交
1083

A
Alex Crichton 已提交
1084 1085
    #[test]
    fn smoke_shared_port_gone() {
1086
        let (tx, rx) = channel::<int>();
1087
        drop(rx);
A
Alex Crichton 已提交
1088 1089
        assert!(tx.send(1).is_err())
    }
A
Alex Crichton 已提交
1090

A
Alex Crichton 已提交
1091 1092
    #[test]
    fn smoke_shared_port_gone2() {
1093
        let (tx, rx) = channel::<int>();
1094 1095 1096
        drop(rx);
        let tx2 = tx.clone();
        drop(tx);
A
Alex Crichton 已提交
1097 1098
        assert!(tx2.send(1).is_err());
    }
A
Alex Crichton 已提交
1099

A
Alex Crichton 已提交
1100 1101
    #[test]
    fn port_gone_concurrent() {
1102
        let (tx, rx) = channel::<int>();
A
Alex Crichton 已提交
1103 1104
        let _t = Thread::spawn(move|| {
            rx.recv().unwrap();
1105
        });
A
Alex Crichton 已提交
1106 1107
        while tx.send(1).is_ok() {}
    }
A
Alex Crichton 已提交
1108

A
Alex Crichton 已提交
1109 1110
    #[test]
    fn port_gone_concurrent_shared() {
1111
        let (tx, rx) = channel::<int>();
1112
        let tx2 = tx.clone();
A
Alex Crichton 已提交
1113 1114
        let _t = Thread::spawn(move|| {
            rx.recv().unwrap();
1115
        });
A
Alex Crichton 已提交
1116 1117
        while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
    }
A
Alex Crichton 已提交
1118

A
Alex Crichton 已提交
1119 1120
    #[test]
    fn smoke_chan_gone() {
1121 1122
        let (tx, rx) = channel::<int>();
        drop(tx);
A
Alex Crichton 已提交
1123 1124
        assert!(rx.recv().is_err());
    }
A
Alex Crichton 已提交
1125

A
Alex Crichton 已提交
1126 1127
    #[test]
    fn smoke_chan_gone_shared() {
1128 1129 1130 1131
        let (tx, rx) = channel::<()>();
        let tx2 = tx.clone();
        drop(tx);
        drop(tx2);
A
Alex Crichton 已提交
1132 1133
        assert!(rx.recv().is_err());
    }
A
Alex Crichton 已提交
1134

A
Alex Crichton 已提交
1135 1136
    #[test]
    fn chan_gone_concurrent() {
1137
        let (tx, rx) = channel::<int>();
A
Alex Crichton 已提交
1138 1139 1140
        let _t = Thread::spawn(move|| {
            tx.send(1).unwrap();
            tx.send(1).unwrap();
1141
        });
A
Alex Crichton 已提交
1142 1143
        while rx.recv().is_ok() {}
    }
A
Alex Crichton 已提交
1144

A
Alex Crichton 已提交
1145 1146
    #[test]
    fn stress() {
1147
        let (tx, rx) = channel::<int>();
A
Aaron Turon 已提交
1148
        let t = Thread::scoped(move|| {
A
Alex Crichton 已提交
1149
            for _ in range(0u, 10000) { tx.send(1i).unwrap(); }
1150
        });
1151
        for _ in range(0u, 10000) {
A
Alex Crichton 已提交
1152
            assert_eq!(rx.recv().unwrap(), 1);
A
Alex Crichton 已提交
1153
        }
A
Alex Crichton 已提交
1154 1155
        t.join().ok().unwrap();
    }
A
Alex Crichton 已提交
1156

A
Alex Crichton 已提交
1157 1158
    #[test]
    fn stress_shared() {
A
Alex Crichton 已提交
1159 1160
        static AMT: uint = 10000;
        static NTHREADS: uint = 8;
1161
        let (tx, rx) = channel::<int>();
A
Alex Crichton 已提交
1162

A
Aaron Turon 已提交
1163
        let t = Thread::scoped(move|| {
A
Alex Crichton 已提交
1164
            for _ in range(0, AMT * NTHREADS) {
A
Alex Crichton 已提交
1165
                assert_eq!(rx.recv().unwrap(), 1);
A
Alex Crichton 已提交
1166
            }
1167
            match rx.try_recv() {
S
Steve Klabnik 已提交
1168
                Ok(..) => panic!(),
1169 1170
                _ => {}
            }
1171
        });
A
Alex Crichton 已提交
1172 1173

        for _ in range(0, NTHREADS) {
1174
            let tx = tx.clone();
A
Alex Crichton 已提交
1175 1176
            Thread::spawn(move|| {
                for _ in range(0, AMT) { tx.send(1).unwrap(); }
A
Aaron Turon 已提交
1177
            });
A
Alex Crichton 已提交
1178
        }
1179
        drop(tx);
A
Alex Crichton 已提交
1180 1181
        t.join().ok().unwrap();
    }
A
Alex Crichton 已提交
1182 1183 1184

    #[test]
    fn send_from_outside_runtime() {
1185 1186
        let (tx1, rx1) = channel::<()>();
        let (tx2, rx2) = channel::<int>();
A
Aaron Turon 已提交
1187
        let t1 = Thread::scoped(move|| {
A
Alex Crichton 已提交
1188
            tx1.send(()).unwrap();
1189
            for _ in range(0i, 40) {
A
Alex Crichton 已提交
1190
                assert_eq!(rx2.recv().unwrap(), 1);
A
Alex Crichton 已提交
1191
            }
1192
        });
A
Alex Crichton 已提交
1193
        rx1.recv().unwrap();
A
Aaron Turon 已提交
1194
        let t2 = Thread::scoped(move|| {
1195
            for _ in range(0i, 40) {
A
Alex Crichton 已提交
1196
                tx2.send(1).unwrap();
A
Alex Crichton 已提交
1197
            }
1198
        });
A
Alex Crichton 已提交
1199 1200
        t1.join().ok().unwrap();
        t2.join().ok().unwrap();
A
Alex Crichton 已提交
1201 1202 1203 1204
    }

    #[test]
    fn recv_from_outside_runtime() {
1205
        let (tx, rx) = channel::<int>();
A
Aaron Turon 已提交
1206
        let t = Thread::scoped(move|| {
1207
            for _ in range(0i, 40) {
A
Alex Crichton 已提交
1208
                assert_eq!(rx.recv().unwrap(), 1);
A
Alex Crichton 已提交
1209
            }
1210
        });
1211
        for _ in range(0u, 40) {
A
Alex Crichton 已提交
1212
            tx.send(1).unwrap();
A
Alex Crichton 已提交
1213
        }
A
Alex Crichton 已提交
1214
        t.join().ok().unwrap();
A
Alex Crichton 已提交
1215 1216 1217 1218
    }

    #[test]
    fn no_runtime() {
1219 1220
        let (tx1, rx1) = channel::<int>();
        let (tx2, rx2) = channel::<int>();
A
Aaron Turon 已提交
1221
        let t1 = Thread::scoped(move|| {
A
Alex Crichton 已提交
1222 1223
            assert_eq!(rx1.recv().unwrap(), 1);
            tx2.send(2).unwrap();
1224
        });
A
Aaron Turon 已提交
1225
        let t2 = Thread::scoped(move|| {
A
Alex Crichton 已提交
1226 1227
            tx1.send(1).unwrap();
            assert_eq!(rx2.recv().unwrap(), 2);
1228
        });
A
Alex Crichton 已提交
1229 1230
        t1.join().ok().unwrap();
        t2.join().ok().unwrap();
A
Alex Crichton 已提交
1231 1232
    }

A
Alex Crichton 已提交
1233 1234
    #[test]
    fn oneshot_single_thread_close_port_first() {
A
Alex Crichton 已提交
1235
        // Simple test of closing without sending
1236 1237
        let (_tx, rx) = channel::<int>();
        drop(rx);
A
Alex Crichton 已提交
1238
    }
A
Alex Crichton 已提交
1239

A
Alex Crichton 已提交
1240 1241
    #[test]
    fn oneshot_single_thread_close_chan_first() {
A
Alex Crichton 已提交
1242
        // Simple test of closing without sending
1243 1244
        let (tx, _rx) = channel::<int>();
        drop(tx);
A
Alex Crichton 已提交
1245
    }
A
Alex Crichton 已提交
1246

A
Alex Crichton 已提交
1247 1248
    #[test]
    fn oneshot_single_thread_send_port_close() {
A
Alex Crichton 已提交
1249
        // Testing that the sender cleans up the payload if receiver is closed
1250
        let (tx, rx) = channel::<Box<int>>();
1251
        drop(rx);
A
Alex Crichton 已提交
1252 1253
        assert!(tx.send(box 0).is_err());
    }
A
Alex Crichton 已提交
1254

A
Alex Crichton 已提交
1255 1256
    #[test]
    fn oneshot_single_thread_recv_chan_close() {
S
Steve Klabnik 已提交
1257
        // Receiving on a closed chan will panic
A
Aaron Turon 已提交
1258
        let res = Thread::scoped(move|| {
1259 1260
            let (tx, rx) = channel::<int>();
            drop(tx);
A
Alex Crichton 已提交
1261
            rx.recv().unwrap();
A
Aaron Turon 已提交
1262
        }).join();
A
Alex Crichton 已提交
1263 1264
        // What is our res?
        assert!(res.is_err());
A
Alex Crichton 已提交
1265
    }
A
Alex Crichton 已提交
1266

A
Alex Crichton 已提交
1267 1268
    #[test]
    fn oneshot_single_thread_send_then_recv() {
1269
        let (tx, rx) = channel::<Box<int>>();
A
Alex Crichton 已提交
1270 1271 1272
        tx.send(box 10).unwrap();
        assert!(rx.recv().unwrap() == box 10);
    }
A
Alex Crichton 已提交
1273

A
Alex Crichton 已提交
1274 1275
    #[test]
    fn oneshot_single_thread_try_send_open() {
1276
        let (tx, rx) = channel::<int>();
A
Alex Crichton 已提交
1277 1278 1279
        assert!(tx.send(10).is_ok());
        assert!(rx.recv().unwrap() == 10);
    }
A
Alex Crichton 已提交
1280

A
Alex Crichton 已提交
1281 1282
    #[test]
    fn oneshot_single_thread_try_send_closed() {
1283 1284
        let (tx, rx) = channel::<int>();
        drop(rx);
A
Alex Crichton 已提交
1285 1286
        assert!(tx.send(10).is_err());
    }
A
Alex Crichton 已提交
1287

A
Alex Crichton 已提交
1288 1289
    #[test]
    fn oneshot_single_thread_try_recv_open() {
1290
        let (tx, rx) = channel::<int>();
A
Alex Crichton 已提交
1291 1292 1293
        tx.send(10).unwrap();
        assert!(rx.recv() == Ok(10));
    }
A
Alex Crichton 已提交
1294

A
Alex Crichton 已提交
1295 1296
    #[test]
    fn oneshot_single_thread_try_recv_closed() {
1297 1298
        let (tx, rx) = channel::<int>();
        drop(tx);
A
Alex Crichton 已提交
1299 1300
        assert!(rx.recv().is_err());
    }
A
Alex Crichton 已提交
1301

A
Alex Crichton 已提交
1302 1303
    #[test]
    fn oneshot_single_thread_peek_data() {
1304
        let (tx, rx) = channel::<int>();
A
Alex Crichton 已提交
1305 1306
        assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
        tx.send(10).unwrap();
1307
        assert_eq!(rx.try_recv(), Ok(10));
A
Alex Crichton 已提交
1308
    }
A
Alex Crichton 已提交
1309

A
Alex Crichton 已提交
1310 1311
    #[test]
    fn oneshot_single_thread_peek_close() {
1312 1313
        let (tx, rx) = channel::<int>();
        drop(tx);
A
Alex Crichton 已提交
1314 1315 1316
        assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
        assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
    }
A
Alex Crichton 已提交
1317

A
Alex Crichton 已提交
1318 1319
    #[test]
    fn oneshot_single_thread_peek_open() {
1320
        let (_tx, rx) = channel::<int>();
A
Alex Crichton 已提交
1321 1322
        assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
    }
A
Alex Crichton 已提交
1323

A
Alex Crichton 已提交
1324 1325
    #[test]
    fn oneshot_multi_task_recv_then_send() {
1326
        let (tx, rx) = channel::<Box<int>>();
A
Alex Crichton 已提交
1327 1328
        let _t = Thread::spawn(move|| {
            assert!(rx.recv().unwrap() == box 10);
1329
        });
A
Alex Crichton 已提交
1330

A
Alex Crichton 已提交
1331 1332
        tx.send(box 10).unwrap();
    }
A
Alex Crichton 已提交
1333

A
Alex Crichton 已提交
1334 1335
    #[test]
    fn oneshot_multi_task_recv_then_close() {
1336
        let (tx, rx) = channel::<Box<int>>();
A
Alex Crichton 已提交
1337
        let _t = Thread::spawn(move|| {
1338
            drop(tx);
1339
        });
A
Aaron Turon 已提交
1340
        let res = Thread::scoped(move|| {
A
Alex Crichton 已提交
1341
            assert!(rx.recv().unwrap() == box 10);
A
Aaron Turon 已提交
1342
        }).join();
A
Alex Crichton 已提交
1343
        assert!(res.is_err());
A
Alex Crichton 已提交
1344
    }
A
Alex Crichton 已提交
1345

A
Alex Crichton 已提交
1346 1347
    #[test]
    fn oneshot_multi_thread_close_stress() {
B
Brendan Zabarauskas 已提交
1348
        for _ in range(0, stress_factor()) {
1349
            let (tx, rx) = channel::<int>();
A
Alex Crichton 已提交
1350
            let _t = Thread::spawn(move|| {
1351
                drop(rx);
1352
            });
1353
            drop(tx);
B
Brendan Zabarauskas 已提交
1354
        }
A
Alex Crichton 已提交
1355
    }
A
Alex Crichton 已提交
1356

A
Alex Crichton 已提交
1357 1358
    #[test]
    fn oneshot_multi_thread_send_close_stress() {
B
Brendan Zabarauskas 已提交
1359
        for _ in range(0, stress_factor()) {
1360
            let (tx, rx) = channel::<int>();
A
Alex Crichton 已提交
1361
            let _t = Thread::spawn(move|| {
1362
                drop(rx);
1363
            });
A
Aaron Turon 已提交
1364
            let _ = Thread::scoped(move|| {
A
Alex Crichton 已提交
1365
                tx.send(1).unwrap();
A
Aaron Turon 已提交
1366
            }).join();
B
Brendan Zabarauskas 已提交
1367
        }
A
Alex Crichton 已提交
1368
    }
A
Alex Crichton 已提交
1369

A
Alex Crichton 已提交
1370 1371
    #[test]
    fn oneshot_multi_thread_recv_close_stress() {
B
Brendan Zabarauskas 已提交
1372
        for _ in range(0, stress_factor()) {
1373
            let (tx, rx) = channel::<int>();
A
Alex Crichton 已提交
1374
            Thread::spawn(move|| {
A
Aaron Turon 已提交
1375
                let res = Thread::scoped(move|| {
A
Alex Crichton 已提交
1376
                    rx.recv().unwrap();
A
Aaron Turon 已提交
1377
                }).join();
A
Alex Crichton 已提交
1378
                assert!(res.is_err());
A
Aaron Turon 已提交
1379
            });
A
Alex Crichton 已提交
1380 1381
            let _t = Thread::spawn(move|| {
                Thread::spawn(move|| {
1382
                    drop(tx);
A
Aaron Turon 已提交
1383
                });
1384
            });
B
Brendan Zabarauskas 已提交
1385
        }
A
Alex Crichton 已提交
1386
    }
A
Alex Crichton 已提交
1387

A
Alex Crichton 已提交
1388 1389
    #[test]
    fn oneshot_multi_thread_send_recv_stress() {
B
Brendan Zabarauskas 已提交
1390
        for _ in range(0, stress_factor()) {
1391
            let (tx, rx) = channel();
A
Alex Crichton 已提交
1392 1393
            let _t = Thread::spawn(move|| {
                tx.send(box 10i).unwrap();
1394
            });
A
Alex Crichton 已提交
1395
            assert!(rx.recv().unwrap() == box 10i);
B
Brendan Zabarauskas 已提交
1396
        }
A
Alex Crichton 已提交
1397
    }
A
Alex Crichton 已提交
1398

A
Alex Crichton 已提交
1399 1400
    #[test]
    fn stream_send_recv_stress() {
B
Brendan Zabarauskas 已提交
1401
        for _ in range(0, stress_factor()) {
1402
            let (tx, rx) = channel();
A
Alex Crichton 已提交
1403

1404 1405
            send(tx, 0);
            recv(rx, 0);
A
Alex Crichton 已提交
1406

1407
            fn send(tx: Sender<Box<int>>, i: int) {
A
Alex Crichton 已提交
1408 1409
                if i == 10 { return }

A
Alex Crichton 已提交
1410 1411
                Thread::spawn(move|| {
                    tx.send(box i).unwrap();
1412
                    send(tx, i + 1);
A
Aaron Turon 已提交
1413
                });
A
Alex Crichton 已提交
1414 1415
            }

1416
            fn recv(rx: Receiver<Box<int>>, i: int) {
A
Alex Crichton 已提交
1417 1418
                if i == 10 { return }

A
Alex Crichton 已提交
1419 1420
                Thread::spawn(move|| {
                    assert!(rx.recv().unwrap() == box i);
1421
                    recv(rx, i + 1);
A
Aaron Turon 已提交
1422
                });
A
Alex Crichton 已提交
1423
            }
B
Brendan Zabarauskas 已提交
1424
        }
A
Alex Crichton 已提交
1425
    }
A
Alex Crichton 已提交
1426

A
Alex Crichton 已提交
1427 1428
    #[test]
    fn recv_a_lot() {
A
Alex Crichton 已提交
1429
        // Regression test that we don't run out of stack in scheduler context
1430
        let (tx, rx) = channel();
A
Alex Crichton 已提交
1431 1432 1433
        for _ in range(0i, 10000) { tx.send(()).unwrap(); }
        for _ in range(0i, 10000) { rx.recv().unwrap(); }
    }
A
Alex Crichton 已提交
1434

A
Alex Crichton 已提交
1435 1436
    #[test]
    fn shared_chan_stress() {
1437
        let (tx, rx) = channel();
A
Alex Crichton 已提交
1438
        let total = stress_factor() + 100;
B
Brendan Zabarauskas 已提交
1439
        for _ in range(0, total) {
1440
            let tx = tx.clone();
A
Alex Crichton 已提交
1441 1442
            Thread::spawn(move|| {
                tx.send(()).unwrap();
A
Aaron Turon 已提交
1443
            });
B
Brendan Zabarauskas 已提交
1444
        }
A
Alex Crichton 已提交
1445

B
Brendan Zabarauskas 已提交
1446
        for _ in range(0, total) {
A
Alex Crichton 已提交
1447
            rx.recv().unwrap();
B
Brendan Zabarauskas 已提交
1448
        }
A
Alex Crichton 已提交
1449
    }
A
Alex Crichton 已提交
1450

A
Alex Crichton 已提交
1451 1452
    #[test]
    fn test_nested_recv_iter() {
1453 1454
        let (tx, rx) = channel::<int>();
        let (total_tx, total_rx) = channel::<int>();
A
Alex Crichton 已提交
1455

A
Alex Crichton 已提交
1456
        let _t = Thread::spawn(move|| {
A
Alex Crichton 已提交
1457
            let mut acc = 0;
1458
            for x in rx.iter() {
A
Alex Crichton 已提交
1459 1460
                acc += x;
            }
A
Alex Crichton 已提交
1461
            total_tx.send(acc).unwrap();
1462
        });
A
Alex Crichton 已提交
1463

A
Alex Crichton 已提交
1464 1465 1466
        tx.send(3).unwrap();
        tx.send(1).unwrap();
        tx.send(2).unwrap();
1467
        drop(tx);
A
Alex Crichton 已提交
1468 1469
        assert_eq!(total_rx.recv().unwrap(), 6);
    }
A
Alex Crichton 已提交
1470

A
Alex Crichton 已提交
1471 1472
    #[test]
    fn test_recv_iter_break() {
1473 1474
        let (tx, rx) = channel::<int>();
        let (count_tx, count_rx) = channel();
A
Alex Crichton 已提交
1475

A
Alex Crichton 已提交
1476
        let _t = Thread::spawn(move|| {
A
Alex Crichton 已提交
1477
            let mut count = 0;
1478
            for x in rx.iter() {
A
Alex Crichton 已提交
1479 1480 1481 1482 1483 1484
                if count >= 3 {
                    break;
                } else {
                    count += x;
                }
            }
A
Alex Crichton 已提交
1485
            count_tx.send(count).unwrap();
1486
        });
A
Alex Crichton 已提交
1487

A
Alex Crichton 已提交
1488 1489 1490 1491
        tx.send(2).unwrap();
        tx.send(2).unwrap();
        tx.send(2).unwrap();
        let _ = tx.send(2);
1492
        drop(tx);
A
Alex Crichton 已提交
1493 1494
        assert_eq!(count_rx.recv().unwrap(), 4);
    }
1495

A
Alex Crichton 已提交
1496 1497
    #[test]
    fn try_recv_states() {
1498 1499 1500
        let (tx1, rx1) = channel::<int>();
        let (tx2, rx2) = channel::<()>();
        let (tx3, rx3) = channel::<()>();
A
Alex Crichton 已提交
1501 1502 1503 1504 1505
        let _t = Thread::spawn(move|| {
            rx2.recv().unwrap();
            tx1.send(1).unwrap();
            tx3.send(()).unwrap();
            rx2.recv().unwrap();
1506
            drop(tx1);
A
Alex Crichton 已提交
1507
            tx3.send(()).unwrap();
1508
        });
1509

A
Alex Crichton 已提交
1510 1511 1512
        assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
        tx2.send(()).unwrap();
        rx3.recv().unwrap();
1513
        assert_eq!(rx1.try_recv(), Ok(1));
A
Alex Crichton 已提交
1514 1515 1516 1517 1518
        assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
        tx2.send(()).unwrap();
        rx3.recv().unwrap();
        assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
    }
1519

1520 1521
    // This bug used to end up in a livelock inside of the Receiver destructor
    // because the internal state of the Shared packet was corrupted
A
Alex Crichton 已提交
1522 1523
    #[test]
    fn destroy_upgraded_shared_port_when_sender_still_active() {
1524 1525
        let (tx, rx) = channel();
        let (tx2, rx2) = channel();
A
Alex Crichton 已提交
1526 1527
        let _t = Thread::spawn(move|| {
            rx.recv().unwrap(); // wait on a oneshot
1528
            drop(rx);  // destroy a shared
A
Alex Crichton 已提交
1529
            tx2.send(()).unwrap();
1530 1531
        });
        // make sure the other task has gone to sleep
A
Aaron Turon 已提交
1532
        for _ in range(0u, 5000) { Thread::yield_now(); }
1533 1534

        // upgrade to a shared chan and send a message
1535 1536
        let t = tx.clone();
        drop(tx);
A
Alex Crichton 已提交
1537
        t.send(()).unwrap();
1538 1539

        // wait for the child task to exit before we exit
A
Alex Crichton 已提交
1540 1541
        rx2.recv().unwrap();
    }
A
Alex Crichton 已提交
1542
}
1543 1544 1545

#[cfg(test)]
mod sync_tests {
A
Alex Crichton 已提交
1546 1547
    use prelude::v1::*;

A
Aaron Turon 已提交
1548
    use os;
A
Alex Crichton 已提交
1549 1550
    use thread::Thread;
    use super::*;
1551 1552 1553

    pub fn stress_factor() -> uint {
        match os::getenv("RUST_TEST_STRESS") {
A
Alex Crichton 已提交
1554
            Some(val) => val.parse().unwrap(),
1555 1556 1557 1558
            None => 1,
        }
    }

A
Alex Crichton 已提交
1559 1560
    #[test]
    fn smoke() {
1561
        let (tx, rx) = sync_channel::<int>(1);
A
Alex Crichton 已提交
1562 1563 1564
        tx.send(1).unwrap();
        assert_eq!(rx.recv().unwrap(), 1);
    }
1565

A
Alex Crichton 已提交
1566 1567
    #[test]
    fn drop_full() {
1568
        let (tx, _rx) = sync_channel(1);
A
Alex Crichton 已提交
1569 1570
        tx.send(box 1i).unwrap();
    }
1571

A
Alex Crichton 已提交
1572 1573
    #[test]
    fn smoke_shared() {
1574
        let (tx, rx) = sync_channel::<int>(1);
A
Alex Crichton 已提交
1575 1576
        tx.send(1).unwrap();
        assert_eq!(rx.recv().unwrap(), 1);
1577
        let tx = tx.clone();
A
Alex Crichton 已提交
1578 1579 1580
        tx.send(1).unwrap();
        assert_eq!(rx.recv().unwrap(), 1);
    }
1581

A
Alex Crichton 已提交
1582 1583
    #[test]
    fn smoke_threads() {
1584
        let (tx, rx) = sync_channel::<int>(0);
A
Alex Crichton 已提交
1585 1586
        let _t = Thread::spawn(move|| {
            tx.send(1).unwrap();
1587
        });
A
Alex Crichton 已提交
1588 1589
        assert_eq!(rx.recv().unwrap(), 1);
    }
1590

A
Alex Crichton 已提交
1591 1592
    #[test]
    fn smoke_port_gone() {
1593
        let (tx, rx) = sync_channel::<int>(0);
1594
        drop(rx);
A
Alex Crichton 已提交
1595 1596
        assert!(tx.send(1).is_err());
    }
1597

A
Alex Crichton 已提交
1598 1599
    #[test]
    fn smoke_shared_port_gone2() {
1600
        let (tx, rx) = sync_channel::<int>(0);
1601 1602 1603
        drop(rx);
        let tx2 = tx.clone();
        drop(tx);
A
Alex Crichton 已提交
1604 1605
        assert!(tx2.send(1).is_err());
    }
1606

A
Alex Crichton 已提交
1607 1608
    #[test]
    fn port_gone_concurrent() {
1609
        let (tx, rx) = sync_channel::<int>(0);
A
Alex Crichton 已提交
1610 1611
        let _t = Thread::spawn(move|| {
            rx.recv().unwrap();
1612
        });
A
Alex Crichton 已提交
1613 1614
        while tx.send(1).is_ok() {}
    }
1615

A
Alex Crichton 已提交
1616 1617
    #[test]
    fn port_gone_concurrent_shared() {
1618
        let (tx, rx) = sync_channel::<int>(0);
1619
        let tx2 = tx.clone();
A
Alex Crichton 已提交
1620 1621
        let _t = Thread::spawn(move|| {
            rx.recv().unwrap();
1622
        });
A
Alex Crichton 已提交
1623 1624
        while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
    }
1625

A
Alex Crichton 已提交
1626 1627
    #[test]
    fn smoke_chan_gone() {
1628 1629
        let (tx, rx) = sync_channel::<int>(0);
        drop(tx);
A
Alex Crichton 已提交
1630 1631
        assert!(rx.recv().is_err());
    }
1632

A
Alex Crichton 已提交
1633 1634
    #[test]
    fn smoke_chan_gone_shared() {
1635 1636 1637 1638
        let (tx, rx) = sync_channel::<()>(0);
        let tx2 = tx.clone();
        drop(tx);
        drop(tx2);
A
Alex Crichton 已提交
1639 1640
        assert!(rx.recv().is_err());
    }
1641

A
Alex Crichton 已提交
1642 1643
    #[test]
    fn chan_gone_concurrent() {
1644
        let (tx, rx) = sync_channel::<int>(0);
A
Alex Crichton 已提交
1645 1646 1647
        Thread::spawn(move|| {
            tx.send(1).unwrap();
            tx.send(1).unwrap();
A
Aaron Turon 已提交
1648
        });
A
Alex Crichton 已提交
1649 1650
        while rx.recv().is_ok() {}
    }
1651

A
Alex Crichton 已提交
1652 1653
    #[test]
    fn stress() {
1654
        let (tx, rx) = sync_channel::<int>(0);
A
Alex Crichton 已提交
1655 1656
        Thread::spawn(move|| {
            for _ in range(0u, 10000) { tx.send(1).unwrap(); }
A
Aaron Turon 已提交
1657
        });
1658
        for _ in range(0u, 10000) {
A
Alex Crichton 已提交
1659
            assert_eq!(rx.recv().unwrap(), 1);
1660
        }
A
Alex Crichton 已提交
1661
    }
1662

A
Alex Crichton 已提交
1663 1664
    #[test]
    fn stress_shared() {
1665 1666 1667 1668 1669
        static AMT: uint = 1000;
        static NTHREADS: uint = 8;
        let (tx, rx) = sync_channel::<int>(0);
        let (dtx, drx) = sync_channel::<()>(0);

A
Alex Crichton 已提交
1670
        Thread::spawn(move|| {
1671
            for _ in range(0, AMT * NTHREADS) {
A
Alex Crichton 已提交
1672
                assert_eq!(rx.recv().unwrap(), 1);
1673 1674
            }
            match rx.try_recv() {
S
Steve Klabnik 已提交
1675
                Ok(..) => panic!(),
1676 1677
                _ => {}
            }
A
Alex Crichton 已提交
1678
            dtx.send(()).unwrap();
A
Aaron Turon 已提交
1679
        });
1680 1681 1682

        for _ in range(0, NTHREADS) {
            let tx = tx.clone();
A
Alex Crichton 已提交
1683 1684
            Thread::spawn(move|| {
                for _ in range(0, AMT) { tx.send(1).unwrap(); }
A
Aaron Turon 已提交
1685
            });
1686 1687
        }
        drop(tx);
A
Alex Crichton 已提交
1688 1689
        drx.recv().unwrap();
    }
1690

A
Alex Crichton 已提交
1691 1692
    #[test]
    fn oneshot_single_thread_close_port_first() {
1693 1694 1695
        // Simple test of closing without sending
        let (_tx, rx) = sync_channel::<int>(0);
        drop(rx);
A
Alex Crichton 已提交
1696
    }
1697

A
Alex Crichton 已提交
1698 1699
    #[test]
    fn oneshot_single_thread_close_chan_first() {
1700 1701 1702
        // Simple test of closing without sending
        let (tx, _rx) = sync_channel::<int>(0);
        drop(tx);
A
Alex Crichton 已提交
1703
    }
1704

A
Alex Crichton 已提交
1705 1706
    #[test]
    fn oneshot_single_thread_send_port_close() {
1707
        // Testing that the sender cleans up the payload if receiver is closed
1708
        let (tx, rx) = sync_channel::<Box<int>>(0);
1709
        drop(rx);
A
Alex Crichton 已提交
1710 1711
        assert!(tx.send(box 0).is_err());
    }
1712

A
Alex Crichton 已提交
1713 1714
    #[test]
    fn oneshot_single_thread_recv_chan_close() {
S
Steve Klabnik 已提交
1715
        // Receiving on a closed chan will panic
A
Aaron Turon 已提交
1716
        let res = Thread::scoped(move|| {
1717 1718
            let (tx, rx) = sync_channel::<int>(0);
            drop(tx);
A
Alex Crichton 已提交
1719
            rx.recv().unwrap();
A
Aaron Turon 已提交
1720
        }).join();
1721 1722
        // What is our res?
        assert!(res.is_err());
A
Alex Crichton 已提交
1723
    }
1724

A
Alex Crichton 已提交
1725 1726
    #[test]
    fn oneshot_single_thread_send_then_recv() {
1727
        let (tx, rx) = sync_channel::<Box<int>>(1);
A
Alex Crichton 已提交
1728 1729 1730
        tx.send(box 10).unwrap();
        assert!(rx.recv().unwrap() == box 10);
    }
1731

A
Alex Crichton 已提交
1732 1733
    #[test]
    fn oneshot_single_thread_try_send_open() {
1734
        let (tx, rx) = sync_channel::<int>(1);
1735
        assert_eq!(tx.try_send(10), Ok(()));
A
Alex Crichton 已提交
1736 1737
        assert!(rx.recv().unwrap() == 10);
    }
1738

A
Alex Crichton 已提交
1739 1740
    #[test]
    fn oneshot_single_thread_try_send_closed() {
1741 1742
        let (tx, rx) = sync_channel::<int>(0);
        drop(rx);
A
Alex Crichton 已提交
1743 1744
        assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
    }
1745

A
Alex Crichton 已提交
1746 1747
    #[test]
    fn oneshot_single_thread_try_send_closed2() {
1748
        let (tx, _rx) = sync_channel::<int>(0);
A
Alex Crichton 已提交
1749 1750
        assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
    }
1751

A
Alex Crichton 已提交
1752 1753
    #[test]
    fn oneshot_single_thread_try_recv_open() {
1754
        let (tx, rx) = sync_channel::<int>(1);
A
Alex Crichton 已提交
1755 1756 1757
        tx.send(10).unwrap();
        assert!(rx.recv() == Ok(10));
    }
1758

A
Alex Crichton 已提交
1759 1760
    #[test]
    fn oneshot_single_thread_try_recv_closed() {
1761 1762
        let (tx, rx) = sync_channel::<int>(0);
        drop(tx);
A
Alex Crichton 已提交
1763 1764
        assert!(rx.recv().is_err());
    }
1765

A
Alex Crichton 已提交
1766 1767
    #[test]
    fn oneshot_single_thread_peek_data() {
1768
        let (tx, rx) = sync_channel::<int>(1);
A
Alex Crichton 已提交
1769 1770
        assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
        tx.send(10).unwrap();
1771
        assert_eq!(rx.try_recv(), Ok(10));
A
Alex Crichton 已提交
1772
    }
1773

A
Alex Crichton 已提交
1774 1775
    #[test]
    fn oneshot_single_thread_peek_close() {
1776 1777
        let (tx, rx) = sync_channel::<int>(0);
        drop(tx);
A
Alex Crichton 已提交
1778 1779 1780
        assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
        assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
    }
1781

A
Alex Crichton 已提交
1782 1783
    #[test]
    fn oneshot_single_thread_peek_open() {
1784
        let (_tx, rx) = sync_channel::<int>(0);
A
Alex Crichton 已提交
1785 1786
        assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
    }
1787

A
Alex Crichton 已提交
1788 1789
    #[test]
    fn oneshot_multi_task_recv_then_send() {
1790
        let (tx, rx) = sync_channel::<Box<int>>(0);
A
Alex Crichton 已提交
1791 1792
        let _t = Thread::spawn(move|| {
            assert!(rx.recv().unwrap() == box 10);
1793 1794
        });

A
Alex Crichton 已提交
1795 1796
        tx.send(box 10).unwrap();
    }
1797

A
Alex Crichton 已提交
1798 1799
    #[test]
    fn oneshot_multi_task_recv_then_close() {
1800
        let (tx, rx) = sync_channel::<Box<int>>(0);
A
Alex Crichton 已提交
1801
        let _t = Thread::spawn(move|| {
1802 1803
            drop(tx);
        });
A
Aaron Turon 已提交
1804
        let res = Thread::scoped(move|| {
A
Alex Crichton 已提交
1805
            assert!(rx.recv().unwrap() == box 10);
A
Aaron Turon 已提交
1806
        }).join();
1807
        assert!(res.is_err());
A
Alex Crichton 已提交
1808
    }
1809

A
Alex Crichton 已提交
1810 1811
    #[test]
    fn oneshot_multi_thread_close_stress() {
1812 1813
        for _ in range(0, stress_factor()) {
            let (tx, rx) = sync_channel::<int>(0);
A
Alex Crichton 已提交
1814
            let _t = Thread::spawn(move|| {
1815 1816 1817 1818
                drop(rx);
            });
            drop(tx);
        }
A
Alex Crichton 已提交
1819
    }
1820

A
Alex Crichton 已提交
1821 1822
    #[test]
    fn oneshot_multi_thread_send_close_stress() {
1823 1824
        for _ in range(0, stress_factor()) {
            let (tx, rx) = sync_channel::<int>(0);
A
Alex Crichton 已提交
1825
            let _t = Thread::spawn(move|| {
1826 1827
                drop(rx);
            });
A
Aaron Turon 已提交
1828
            let _ = Thread::scoped(move || {
A
Alex Crichton 已提交
1829
                tx.send(1).unwrap();
A
Aaron Turon 已提交
1830
            }).join();
1831
        }
A
Alex Crichton 已提交
1832
    }
1833

A
Alex Crichton 已提交
1834 1835
    #[test]
    fn oneshot_multi_thread_recv_close_stress() {
1836 1837
        for _ in range(0, stress_factor()) {
            let (tx, rx) = sync_channel::<int>(0);
A
Alex Crichton 已提交
1838
            let _t = Thread::spawn(move|| {
A
Aaron Turon 已提交
1839
                let res = Thread::scoped(move|| {
A
Alex Crichton 已提交
1840
                    rx.recv().unwrap();
A
Aaron Turon 已提交
1841
                }).join();
1842 1843
                assert!(res.is_err());
            });
A
Alex Crichton 已提交
1844 1845
            let _t = Thread::spawn(move|| {
                Thread::spawn(move|| {
1846
                    drop(tx);
A
Aaron Turon 已提交
1847
                });
1848 1849
            });
        }
A
Alex Crichton 已提交
1850
    }
1851

A
Alex Crichton 已提交
1852 1853
    #[test]
    fn oneshot_multi_thread_send_recv_stress() {
1854
        for _ in range(0, stress_factor()) {
1855
            let (tx, rx) = sync_channel::<Box<int>>(0);
A
Alex Crichton 已提交
1856 1857
            let _t = Thread::spawn(move|| {
                tx.send(box 10i).unwrap();
1858
            });
A
Alex Crichton 已提交
1859
            assert!(rx.recv().unwrap() == box 10i);
1860
        }
A
Alex Crichton 已提交
1861
    }
1862

A
Alex Crichton 已提交
1863 1864
    #[test]
    fn stream_send_recv_stress() {
1865
        for _ in range(0, stress_factor()) {
1866
            let (tx, rx) = sync_channel::<Box<int>>(0);
1867 1868 1869 1870

            send(tx, 0);
            recv(rx, 0);

1871
            fn send(tx: SyncSender<Box<int>>, i: int) {
1872 1873
                if i == 10 { return }

A
Alex Crichton 已提交
1874 1875
                Thread::spawn(move|| {
                    tx.send(box i).unwrap();
1876
                    send(tx, i + 1);
A
Aaron Turon 已提交
1877
                });
1878 1879
            }

1880
            fn recv(rx: Receiver<Box<int>>, i: int) {
1881 1882
                if i == 10 { return }

A
Alex Crichton 已提交
1883 1884
                Thread::spawn(move|| {
                    assert!(rx.recv().unwrap() == box i);
1885
                    recv(rx, i + 1);
A
Aaron Turon 已提交
1886
                });
1887 1888
            }
        }
A
Alex Crichton 已提交
1889
    }
1890

A
Alex Crichton 已提交
1891 1892
    #[test]
    fn recv_a_lot() {
1893 1894
        // Regression test that we don't run out of stack in scheduler context
        let (tx, rx) = sync_channel(10000);
A
Alex Crichton 已提交
1895 1896 1897
        for _ in range(0u, 10000) { tx.send(()).unwrap(); }
        for _ in range(0u, 10000) { rx.recv().unwrap(); }
    }
1898

A
Alex Crichton 已提交
1899 1900
    #[test]
    fn shared_chan_stress() {
1901 1902 1903 1904
        let (tx, rx) = sync_channel(0);
        let total = stress_factor() + 100;
        for _ in range(0, total) {
            let tx = tx.clone();
A
Alex Crichton 已提交
1905 1906
            Thread::spawn(move|| {
                tx.send(()).unwrap();
A
Aaron Turon 已提交
1907
            });
1908 1909 1910
        }

        for _ in range(0, total) {
A
Alex Crichton 已提交
1911
            rx.recv().unwrap();
1912
        }
A
Alex Crichton 已提交
1913
    }
1914

A
Alex Crichton 已提交
1915 1916
    #[test]
    fn test_nested_recv_iter() {
1917 1918 1919
        let (tx, rx) = sync_channel::<int>(0);
        let (total_tx, total_rx) = sync_channel::<int>(0);

A
Alex Crichton 已提交
1920
        let _t = Thread::spawn(move|| {
1921 1922 1923 1924
            let mut acc = 0;
            for x in rx.iter() {
                acc += x;
            }
A
Alex Crichton 已提交
1925
            total_tx.send(acc).unwrap();
1926 1927
        });

A
Alex Crichton 已提交
1928 1929 1930
        tx.send(3).unwrap();
        tx.send(1).unwrap();
        tx.send(2).unwrap();
1931
        drop(tx);
A
Alex Crichton 已提交
1932 1933
        assert_eq!(total_rx.recv().unwrap(), 6);
    }
1934

A
Alex Crichton 已提交
1935 1936
    #[test]
    fn test_recv_iter_break() {
1937 1938 1939
        let (tx, rx) = sync_channel::<int>(0);
        let (count_tx, count_rx) = sync_channel(0);

A
Alex Crichton 已提交
1940
        let _t = Thread::spawn(move|| {
1941 1942 1943 1944 1945 1946 1947 1948
            let mut count = 0;
            for x in rx.iter() {
                if count >= 3 {
                    break;
                } else {
                    count += x;
                }
            }
A
Alex Crichton 已提交
1949
            count_tx.send(count).unwrap();
1950 1951
        });

A
Alex Crichton 已提交
1952 1953 1954
        tx.send(2).unwrap();
        tx.send(2).unwrap();
        tx.send(2).unwrap();
1955
        let _ = tx.try_send(2);
1956
        drop(tx);
A
Alex Crichton 已提交
1957 1958
        assert_eq!(count_rx.recv().unwrap(), 4);
    }
1959

A
Alex Crichton 已提交
1960 1961
    #[test]
    fn try_recv_states() {
1962 1963 1964
        let (tx1, rx1) = sync_channel::<int>(1);
        let (tx2, rx2) = sync_channel::<()>(1);
        let (tx3, rx3) = sync_channel::<()>(1);
A
Alex Crichton 已提交
1965 1966 1967 1968 1969
        let _t = Thread::spawn(move|| {
            rx2.recv().unwrap();
            tx1.send(1).unwrap();
            tx3.send(()).unwrap();
            rx2.recv().unwrap();
1970
            drop(tx1);
A
Alex Crichton 已提交
1971
            tx3.send(()).unwrap();
1972 1973
        });

A
Alex Crichton 已提交
1974 1975 1976
        assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
        tx2.send(()).unwrap();
        rx3.recv().unwrap();
1977
        assert_eq!(rx1.try_recv(), Ok(1));
A
Alex Crichton 已提交
1978 1979 1980 1981 1982
        assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
        tx2.send(()).unwrap();
        rx3.recv().unwrap();
        assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
    }
1983 1984 1985

    // This bug used to end up in a livelock inside of the Receiver destructor
    // because the internal state of the Shared packet was corrupted
A
Alex Crichton 已提交
1986 1987
    #[test]
    fn destroy_upgraded_shared_port_when_sender_still_active() {
1988 1989
        let (tx, rx) = sync_channel::<()>(0);
        let (tx2, rx2) = sync_channel::<()>(0);
A
Alex Crichton 已提交
1990 1991
        let _t = Thread::spawn(move|| {
            rx.recv().unwrap(); // wait on a oneshot
1992
            drop(rx);  // destroy a shared
A
Alex Crichton 已提交
1993
            tx2.send(()).unwrap();
1994 1995
        });
        // make sure the other task has gone to sleep
A
Aaron Turon 已提交
1996
        for _ in range(0u, 5000) { Thread::yield_now(); }
1997 1998 1999 2000

        // upgrade to a shared chan and send a message
        let t = tx.clone();
        drop(tx);
A
Alex Crichton 已提交
2001
        t.send(()).unwrap();
2002 2003

        // wait for the child task to exit before we exit
A
Alex Crichton 已提交
2004 2005
        rx2.recv().unwrap();
    }
2006

A
Alex Crichton 已提交
2007 2008
    #[test]
    fn send1() {
2009
        let (tx, rx) = sync_channel::<int>(0);
A
Alex Crichton 已提交
2010 2011 2012
        let _t = Thread::spawn(move|| { rx.recv().unwrap(); });
        assert_eq!(tx.send(1), Ok(()));
    }
2013

A
Alex Crichton 已提交
2014 2015
    #[test]
    fn send2() {
2016
        let (tx, rx) = sync_channel::<int>(0);
A
Alex Crichton 已提交
2017 2018 2019
        let _t = Thread::spawn(move|| { drop(rx); });
        assert!(tx.send(1).is_err());
    }
2020

A
Alex Crichton 已提交
2021 2022
    #[test]
    fn send3() {
2023
        let (tx, rx) = sync_channel::<int>(1);
A
Alex Crichton 已提交
2024 2025 2026 2027
        assert_eq!(tx.send(1), Ok(()));
        let _t =Thread::spawn(move|| { drop(rx); });
        assert!(tx.send(1).is_err());
    }
2028

A
Alex Crichton 已提交
2029 2030
    #[test]
    fn send4() {
2031
        let (tx, rx) = sync_channel::<int>(0);
2032 2033 2034
        let tx2 = tx.clone();
        let (done, donerx) = channel();
        let done2 = done.clone();
A
Alex Crichton 已提交
2035 2036 2037
        let _t = Thread::spawn(move|| {
            assert!(tx.send(1).is_err());
            done.send(()).unwrap();
2038
        });
A
Alex Crichton 已提交
2039 2040 2041
        let _t = Thread::spawn(move|| {
            assert!(tx2.send(2).is_err());
            done2.send(()).unwrap();
2042 2043
        });
        drop(rx);
A
Alex Crichton 已提交
2044 2045 2046
        donerx.recv().unwrap();
        donerx.recv().unwrap();
    }
2047

A
Alex Crichton 已提交
2048 2049
    #[test]
    fn try_send1() {
2050
        let (tx, _rx) = sync_channel::<int>(0);
A
Alex Crichton 已提交
2051 2052
        assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
    }
2053

A
Alex Crichton 已提交
2054 2055
    #[test]
    fn try_send2() {
2056
        let (tx, _rx) = sync_channel::<int>(1);
2057
        assert_eq!(tx.try_send(1), Ok(()));
A
Alex Crichton 已提交
2058 2059
        assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
    }
2060

A
Alex Crichton 已提交
2061 2062
    #[test]
    fn try_send3() {
2063
        let (tx, rx) = sync_channel::<int>(1);
2064
        assert_eq!(tx.try_send(1), Ok(()));
2065
        drop(rx);
A
Alex Crichton 已提交
2066 2067
        assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
    }
S
Steve Klabnik 已提交
2068

A
Alex Crichton 已提交
2069 2070
    #[test]
    fn issue_15761() {
2071 2072 2073 2074
        fn repro() {
            let (tx1, rx1) = sync_channel::<()>(3);
            let (tx2, rx2) = sync_channel::<()>(3);

A
Alex Crichton 已提交
2075 2076
            let _t = Thread::spawn(move|| {
                rx1.recv().unwrap();
2077 2078 2079 2080
                tx2.try_send(()).unwrap();
            });

            tx1.try_send(()).unwrap();
A
Alex Crichton 已提交
2081
            rx2.recv().unwrap();
2082 2083 2084 2085 2086
        }

        for _ in range(0u, 100) {
            repro()
        }
A
Alex Crichton 已提交
2087
    }
2088
}