sync.rs 46.7 KB
Newer Older
1
// Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
2 3 4 5 6 7 8 9 10
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

11 12 13 14 15 16 17
/**
 * The concurrency primitives you know and love.
 *
 * Maybe once we have a "core exports x only to std" mechanism, these can be
 * in std.
 */

18

19 20
use std::borrow;
use std::comm;
21
use std::comm::SendDeferred;
22
use std::comm::{GenericPort, Peekable};
23
use std::task;
24
use std::unstable::sync::{Exclusive, UnsafeAtomicRcBox};
25
use std::unstable::atomics;
26
use std::unstable::finally::Finally;
27
use std::util;
28
use std::util::NonCopyable;
29

30 31 32 33
/****************************************************************************
 * Internals
 ****************************************************************************/

34 35
// Each waiting task receives on one of these.
#[doc(hidden)]
36
type WaitEnd = comm::PortOne<()>;
37
#[doc(hidden)]
38
type SignalEnd = comm::ChanOne<()>;
39
// A doubly-ended queue of waiting tasks.
40
#[doc(hidden)]
41
struct WaitQueue { head: comm::Port<SignalEnd>,
42
                   tail: comm::Chan<SignalEnd> }
43

44 45 46 47 48
impl WaitQueue {
    fn new() -> WaitQueue {
        let (block_head, block_tail) = comm::stream();
        WaitQueue { head: block_head, tail: block_tail }
    }
B
Ben Blum 已提交
49

50 51 52 53 54 55
    // Signals one live task from the queue.
    fn signal(&self) -> bool {
        // The peek is mandatory to make sure recv doesn't block.
        if self.head.peek() {
            // Pop and send a wakeup signal. If the waiter was killed, its port
            // will have closed. Keep trying until we get a live task.
56
            if self.head.recv().try_send_deferred(()) {
57 58 59 60
                true
            } else {
                self.signal()
            }
61
        } else {
62
            false
63 64 65
        }
    }

66 67 68
    fn broadcast(&self) -> uint {
        let mut count = 0;
        while self.head.peek() {
69
            if self.head.recv().try_send_deferred(()) {
70 71
                count += 1;
            }
72
        }
73
        count
74
    }
75 76 77 78 79 80

    fn wait_end(&self) -> WaitEnd {
        let (wait_end, signal_end) = comm::oneshot();
        self.tail.send_deferred(signal_end);
        wait_end
    }
81 82 83
}

// The building-block used to make semaphores, mutexes, and rwlocks.
84
#[doc(hidden)]
B
Ben Blum 已提交
85
struct SemInner<Q> {
86
    count: int,
87
    waiters:   WaitQueue,
88 89
    // Can be either unit or another waitqueue. Some sems shouldn't come with
    // a condition variable attached, others should.
90
    blocked:   Q
B
Ben Blum 已提交
91
}
92

93
#[doc(hidden)]
94
struct Sem<Q>(Exclusive<SemInner<Q>>);
95

96
#[doc(hidden)]
97
impl<Q:Send> Sem<Q> {
98 99 100 101 102
    fn new(count: int, q: Q) -> Sem<Q> {
        Sem(Exclusive::new(SemInner {
            count: count, waiters: WaitQueue::new(), blocked: q }))
    }

103
    pub fn acquire(&self) {
104 105 106 107 108
        unsafe {
            let mut waiter_nobe = None;
            do (**self).with |state| {
                state.count -= 1;
                if state.count < 0 {
109 110 111
                    // Create waiter nobe, enqueue ourself, and tell
                    // outer scope we need to block.
                    waiter_nobe = Some(state.waiters.wait_end());
112 113 114
                }
            }
            // Uncomment if you wish to test for sem races. Not valgrind-friendly.
K
Kevin Ballard 已提交
115
            /* do 1000.times { task::deschedule(); } */
116 117
            // Need to wait outside the exclusive.
            if waiter_nobe.is_some() {
118
                let _ = waiter_nobe.unwrap().recv();
119 120 121
            }
        }
    }
122 123

    pub fn release(&self) {
124 125 126 127
        unsafe {
            do (**self).with |state| {
                state.count += 1;
                if state.count <= 0 {
128
                    state.waiters.signal();
129
                }
130 131 132
            }
        }
    }
133

134
    pub fn access<U>(&self, blk: &fn() -> U) -> U {
135
        do task::unkillable {
136 137 138 139 140 141 142 143
            do (|| {
                self.acquire();
                unsafe {
                    do task::rekillable { blk() }
                }
            }).finally {
                self.release();
            }
144
        }
145 146
    }
}
147

148
#[doc(hidden)]
149 150 151 152
impl Sem<~[WaitQueue]> {
    fn new_and_signal(count: int, num_condvars: uint)
        -> Sem<~[WaitQueue]> {
        let mut queues = ~[];
153
        do num_condvars.times {
154 155 156 157
            queues.push(WaitQueue::new());
        }
        Sem::new(count, queues)
    }
B
Brian Anderson 已提交
158 159
}

160 161 162 163 164 165 166 167
// FIXME(#3598): Want to use an Option down below, but we need a custom enum
// that's not polymorphic to get around the fact that lifetimes are invariant
// inside of type parameters.
enum ReacquireOrderLock<'self> {
    Nothing, // c.c
    Just(&'self Semaphore),
}

168
/// A mechanism for atomic-unlock-and-deschedule blocking and signalling.
169 170 171
pub struct Condvar<'self> {
    // The 'Sem' object associated with this condvar. This is the one that's
    // atomically-unlocked-and-descheduled upon and reacquired during wakeup.
172
    priv sem: &'self Sem<~[WaitQueue]>,
173 174 175 176 177 178 179
    // This is (can be) an extra semaphore which is held around the reacquire
    // operation on the first one. This is only used in cvars associated with
    // rwlocks, and is needed to ensure that, when a downgrader is trying to
    // hand off the access lock (which would be the first field, here), a 2nd
    // writer waking up from a cvar wait can't race with a reader to steal it,
    // See the comment in write_cond for more detail.
    priv order: ReacquireOrderLock<'self>,
180 181
    // Make sure condvars are non-copyable.
    priv token: util::NonCopyable,
182
}
183

184
impl<'self> Condvar<'self> {
185 186 187 188 189 190 191 192
    /**
     * Atomically drop the associated lock, and block until a signal is sent.
     *
     * # Failure
     * A task which is killed (i.e., by linked failure with another task)
     * while waiting on a condition variable will wake up, fail, and unlock
     * the associated lock as it unwinds.
     */
193
    pub fn wait(&self) { self.wait_on(0) }
194

195 196 197 198 199 200 201 202 203 204 205
    /**
     * As wait(), but can specify which of multiple condition variables to
     * wait on. Only a signal_on() or broadcast_on() with the same condvar_id
     * will wake this thread.
     *
     * The associated lock must have been initialised with an appropriate
     * number of condvars. The condvar_id must be between 0 and num_condvars-1
     * or else this call will fail.
     *
     * wait() is equivalent to wait_on(0).
     */
206
    pub fn wait_on(&self, condvar_id: uint) {
207
        let mut WaitEnd = None;
B
Brian Anderson 已提交
208
        let mut out_of_bounds = None;
209 210 211
        do task::unkillable {
            // Release lock, 'atomically' enqueuing ourselves in so doing.
            unsafe {
212
                do (**self.sem).with |state| {
Y
Youngmin Yoo 已提交
213
                    if condvar_id < state.blocked.len() {
214 215 216
                        // Drop the lock.
                        state.count += 1;
                        if state.count <= 0 {
217
                            state.waiters.signal();
218
                        }
219 220 221
                        // Create waiter nobe, and enqueue ourself to
                        // be woken up by a signaller.
                        WaitEnd = Some(state.blocked[condvar_id].wait_end());
222
                    } else {
Y
Youngmin Yoo 已提交
223
                        out_of_bounds = Some(state.blocked.len());
224 225
                    }
                }
226
            }
B
Ben Blum 已提交
227

K
Kevin Ballard 已提交
228
            // If deschedule checks start getting inserted anywhere, we can be
229 230 231 232 233 234 235 236 237 238
            // killed before or after enqueueing. Deciding whether to
            // unkillably reacquire the lock needs to happen atomically
            // wrt enqueuing.
            do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") {
                // Unconditionally "block". (Might not actually block if a
                // signaller already sent -- I mean 'unconditionally' in contrast
                // with acquire().)
                do (|| {
                    unsafe {
                        do task::rekillable {
239
                            let _ = WaitEnd.take_unwrap().recv();
240 241 242 243 244
                        }
                    }
                }).finally {
                    // Reacquire the condvar. Note this is back in the unkillable
                    // section; it needs to succeed, instead of itself dying.
245 246 247 248 249 250 251
                    match self.order {
                        Just(lock) => do lock.access {
                            self.sem.acquire();
                        },
                        Nothing => {
                            self.sem.acquire();
                        },
252
                    }
B
Ben Blum 已提交
253 254 255
                }
            }
        }
256 257 258
    }

    /// Wake up a blocked task. Returns false if there was no blocked task.
259
    pub fn signal(&self) -> bool { self.signal_on(0) }
260

261
    /// As signal, but with a specified condvar_id. See wait_on.
262
    pub fn signal_on(&self, condvar_id: uint) -> bool {
263 264 265 266 267
        unsafe {
            let mut out_of_bounds = None;
            let mut result = false;
            do (**self.sem).with |state| {
                if condvar_id < state.blocked.len() {
268
                    result = state.blocked[condvar_id].signal();
269 270 271 272 273 274
                } else {
                    out_of_bounds = Some(state.blocked.len());
                }
            }
            do check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()") {
                result
275
            }
276
        }
277 278 279
    }

    /// Wake up all blocked tasks. Returns the number of tasks woken.
280
    pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
281

282
    /// As broadcast, but with a specified condvar_id. See wait_on.
283
    pub fn broadcast_on(&self, condvar_id: uint) -> uint {
B
Brian Anderson 已提交
284 285
        let mut out_of_bounds = None;
        let mut queue = None;
286 287 288 289 290 291 292
        unsafe {
            do (**self.sem).with |state| {
                if condvar_id < state.blocked.len() {
                    // To avoid :broadcast_heavy, we make a new waitqueue,
                    // swap it out with the old one, and broadcast on the
                    // old one outside of the little-lock.
                    queue = Some(util::replace(&mut state.blocked[condvar_id],
293
                                               WaitQueue::new()));
294 295 296 297 298
                } else {
                    out_of_bounds = Some(state.blocked.len());
                }
            }
            do check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()") {
299
                let queue = queue.take_unwrap();
300
                queue.broadcast()
301
            }
302 303 304 305 306 307
        }
    }
}

// Checks whether a condvar ID was out of bounds, and fails if so, or does
// something else next on success.
308
#[inline]
309
#[doc(hidden)]
B
Brian Anderson 已提交
310
fn check_cvar_bounds<U>(out_of_bounds: Option<uint>, id: uint, act: &str,
311
                        blk: &fn() -> U) -> U {
312
    match out_of_bounds {
B
Brian Anderson 已提交
313
        Some(0) =>
M
Marvin Löbel 已提交
314
            fail!("%s with illegal ID %u - this lock has no condvars!", act, id),
B
Brian Anderson 已提交
315
        Some(length) =>
M
Marvin Löbel 已提交
316
            fail!("%s with illegal ID %u - ID must be less than %u", act, id, length),
B
Brian Anderson 已提交
317
        None => blk()
318 319 320
    }
}

321
#[doc(hidden)]
322
impl Sem<~[WaitQueue]> {
323 324
    // The only other places that condvars get built are rwlock.write_cond()
    // and rwlock_write_mode.
325
    pub fn access_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
326 327
        do self.access {
            blk(&Condvar { sem: self, order: Nothing, token: NonCopyable::new() })
328
        }
329 330 331 332 333 334 335 336
    }
}

/****************************************************************************
 * Semaphores
 ****************************************************************************/

/// A counting, blocking, bounded-waiting semaphore.
337
struct Semaphore { priv sem: Sem<()> }
338 339


340
impl Clone for Semaphore {
341
    /// Create a new handle to the semaphore.
B
Brian Anderson 已提交
342 343 344 345
    fn clone(&self) -> Semaphore {
        Semaphore { sem: Sem((*self.sem).clone()) }
    }
}
346

347
impl Semaphore {
348 349 350 351 352
    /// Create a new semaphore with the specified count.
    pub fn new(count: int) -> Semaphore {
        Semaphore { sem: Sem::new(count, ()) }
    }

353 354 355 356
    /**
     * Acquire a resource represented by the semaphore. Blocks if necessary
     * until resource(s) become available.
     */
357
    pub fn acquire(&self) { (&self.sem).acquire() }
358 359 360

    /**
     * Release a held resource represented by the semaphore. Wakes a blocked
361
     * contending task, if any exist. Won't block the caller.
362
     */
363
    pub fn release(&self) { (&self.sem).release() }
364 365

    /// Run a function with ownership of one of the semaphore's resources.
366
    pub fn access<U>(&self, blk: &fn() -> U) -> U { (&self.sem).access(blk) }
367 368 369 370 371 372 373 374 375
}

/****************************************************************************
 * Mutexes
 ****************************************************************************/

/**
 * A blocking, bounded-waiting, mutual exclusion lock with an associated
 * FIFO condition variable.
376 377 378 379
 *
 * # Failure
 * A task which fails while holding a mutex will unlock the mutex as it
 * unwinds.
380
 */
381
pub struct Mutex { priv sem: Sem<~[WaitQueue]> }
382

383
impl Clone for Mutex {
384
    /// Create a new handle to the mutex.
B
Brian Anderson 已提交
385 386
    fn clone(&self) -> Mutex { Mutex { sem: Sem((*self.sem).clone()) } }
}
387

388
impl Mutex {
389 390 391 392 393 394 395 396 397 398 399 400 401 402
    /// Create a new mutex, with one associated condvar.
    pub fn new() -> Mutex { Mutex::new_with_condvars(1) }

    /**
    * Create a new mutex, with a specified number of associated condvars. This
    * will allow calling wait_on/signal_on/broadcast_on with condvar IDs between
    * 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be allowed but
    * any operations on the condvar will fail.)
    */
    pub fn new_with_condvars(num_condvars: uint) -> Mutex {
        Mutex { sem: Sem::new_and_signal(1, num_condvars) }
    }


403
    /// Run a function with ownership of the mutex.
404
    pub fn lock<U>(&self, blk: &fn() -> U) -> U {
405
        (&self.sem).access(blk)
406
    }
407 408

    /// Run a function with ownership of the mutex and a handle to a condvar.
409
    pub fn lock_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
B
Ben Blum 已提交
410
        (&self.sem).access_cond(blk)
411 412 413 414 415 416 417
    }
}

/****************************************************************************
 * Reader-writer locks
 ****************************************************************************/

418 419
// NB: Wikipedia - Readers-writers_problem#The_third_readers-writers_problem

420
#[doc(hidden)]
421
struct RWLockInner {
422 423 424 425 426 427 428 429 430
    // You might ask, "Why don't you need to use an atomic for the mode flag?"
    // This flag affects the behaviour of readers (for plain readers, they
    // assert on it; for downgraders, they use it to decide which mode to
    // unlock for). Consider that the flag is only unset when the very last
    // reader exits; therefore, it can never be unset during a reader/reader
    // (or reader/downgrader) race.
    // By the way, if we didn't care about the assert in the read unlock path,
    // we could instead store the mode flag in write_downgrade's stack frame,
    // and have the downgrade tokens store a borrowed pointer to it.
431
    read_mode:  bool,
432 433 434 435 436 437 438
    // The only way the count flag is ever accessed is with xadd. Since it is
    // a read-modify-write operation, multiple xadds on different cores will
    // always be consistent with respect to each other, so a monotonic/relaxed
    // consistency ordering suffices (i.e., no extra barriers are needed).
    // FIXME(#6598): The atomics module has no relaxed ordering flag, so I use
    // acquire/release orderings superfluously. Change these someday.
    read_count: atomics::AtomicUint,
439 440
}

441 442 443 444 445 446 447
/**
 * A blocking, no-starvation, reader-writer lock with an associated condvar.
 *
 * # Failure
 * A task which fails while holding an rwlock will unlock the rwlock as it
 * unwinds.
 */
448
pub struct RWLock {
449
    priv order_lock:  Semaphore,
450 451
    priv access_lock: Sem<~[WaitQueue]>,
    priv state:       UnsafeAtomicRcBox<RWLockInner>,
452 453
}

454 455 456
impl RWLock {
    /// Create a new rwlock, with one associated condvar.
    pub fn new() -> RWLock { RWLock::new_with_condvars(1) }
457

458 459 460 461 462 463 464 465 466 467 468 469 470
    /**
    * Create a new rwlock, with a specified number of associated condvars.
    * Similar to mutex_with_condvars.
    */
    pub fn new_with_condvars(num_condvars: uint) -> RWLock {
        let state = UnsafeAtomicRcBox::new(RWLockInner {
            read_mode:  false,
            read_count: atomics::AtomicUint::new(0),
        });
        RWLock { order_lock:  Semaphore::new(1),
                access_lock: Sem::new_and_signal(1, num_condvars),
                state:       state, }
    }
471

B
Ben Blum 已提交
472
    /// Create a new handle to the rwlock.
473 474
    pub fn clone(&self) -> RWLock {
        RWLock { order_lock:  (&(self.order_lock)).clone(),
B
Ben Blum 已提交
475
                 access_lock: Sem((*self.access_lock).clone()),
476 477 478 479 480 481 482
                 state:       self.state.clone() }
    }

    /**
     * Run a function with the rwlock in read mode. Calls to 'read' from other
     * tasks may run concurrently with this one.
     */
483
    pub fn read<U>(&self, blk: &fn() -> U) -> U {
B
Ben Blum 已提交
484 485 486
        unsafe {
            do task::unkillable {
                do (&self.order_lock).access {
487 488 489
                    let state = &mut *self.state.get();
                    let old_count = state.read_count.fetch_add(1, atomics::Acquire);
                    if old_count == 0 {
B
Ben Blum 已提交
490
                        (&self.access_lock).acquire();
491
                        state.read_mode = true;
B
Ben Blum 已提交
492 493
                    }
                }
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509
                do (|| {
                    do task::rekillable { blk() }
                }).finally {
                    let state = &mut *self.state.get();
                    assert!(state.read_mode);
                    let old_count = state.read_count.fetch_sub(1, atomics::Release);
                    assert!(old_count > 0);
                    if old_count == 1 {
                        state.read_mode = false;
                        // Note: this release used to be outside of a locked access
                        // to exclusive-protected state. If this code is ever
                        // converted back to such (instead of using atomic ops),
                        // this access MUST NOT go inside the exclusive access.
                        (&self.access_lock).release();
                    }
                }
510 511 512 513 514 515 516 517
            }
        }
    }

    /**
     * Run a function with the rwlock in write mode. No calls to 'read' or
     * 'write' from other tasks will run concurrently with this one.
     */
518
    pub fn write<U>(&self, blk: &fn() -> U) -> U {
B
Ben Blum 已提交
519 520 521
        unsafe {
            do task::unkillable {
                (&self.order_lock).acquire();
522
                do (&self.access_lock).access {
523
                    (&self.order_lock).release();
524 525 526
                    do task::rekillable {
                        blk()
                    }
527
                }
B
Ben Blum 已提交
528
            }
529 530 531 532 533 534
        }
    }

    /**
     * As write(), but also with a handle to a condvar. Waiting on this
     * condvar will allow readers and writers alike to take the rwlock before
535 536
     * the waiting task is signalled. (Note: a writer that waited and then
     * was signalled might reacquire the lock before other waiting writers.)
537
     */
538
    pub fn write_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562
        // It's important to thread our order lock into the condvar, so that
        // when a cond.wait() wakes up, it uses it while reacquiring the
        // access lock. If we permitted a waking-up writer to "cut in line",
        // there could arise a subtle race when a downgrader attempts to hand
        // off the reader cloud lock to a waiting reader. This race is tested
        // in arc.rs (test_rw_write_cond_downgrade_read_race) and looks like:
        // T1 (writer)              T2 (downgrader)             T3 (reader)
        // [in cond.wait()]
        //                          [locks for writing]
        //                          [holds access_lock]
        // [is signalled, perhaps by
        //  downgrader or a 4th thread]
        // tries to lock access(!)
        //                                                      lock order_lock
        //                                                      xadd read_count[0->1]
        //                                                      tries to lock access
        //                          [downgrade]
        //                          xadd read_count[1->2]
        //                          unlock access
        // Since T1 contended on the access lock before T3 did, it will steal
        // the lock handoff. Adding order_lock in the condvar reacquire path
        // solves this because T1 will hold order_lock while waiting on access,
        // which will cause T3 to have to wait until T1 finishes its write,
        // which can't happen until T2 finishes the downgrade-read entirely.
563 564
        // The astute reader will also note that making waking writers use the
        // order_lock is better for not starving readers.
565 566 567 568 569
        unsafe {
            do task::unkillable {
                (&self.order_lock).acquire();
                do (&self.access_lock).access_cond |cond| {
                    (&self.order_lock).release();
570 571
                    do task::rekillable {
                        let opt_lock = Just(&self.order_lock);
572 573
                        blk(&Condvar { sem: cond.sem, order: opt_lock,
                                       token: NonCopyable::new() })
574
                    }
575 576 577
                }
            }
        }
578 579
    }

B
Ben Blum 已提交
580 581 582 583 584 585
    /**
     * As write(), but with the ability to atomically 'downgrade' the lock;
     * i.e., to become a reader without letting other writers get the lock in
     * the meantime (such as unlocking and then re-locking as a reader would
     * do). The block takes a "write mode token" argument, which can be
     * transformed into a "read mode token" by calling downgrade(). Example:
586 587 588 589
     *
     * # Example
     *
     * ~~~ {.rust}
590 591
     * do lock.write_downgrade |mut write_token| {
     *     do write_token.write_cond |condvar| {
592
     *         ... exclusive access ...
B
Ben Blum 已提交
593
     *     }
594
     *     let read_token = lock.downgrade(write_token);
595
     *     do read_token.read {
596 597 598 599
     *         ... shared access ...
     *     }
     * }
     * ~~~
B
Ben Blum 已提交
600
     */
601
    pub fn write_downgrade<U>(&self, blk: &fn(v: RWLockWriteMode) -> U) -> U {
B
Ben Blum 已提交
602 603
        // Implementation slightly different from the slicker 'write's above.
        // The exit path is conditional on whether the caller downgrades.
604 605 606 607
        do task::unkillable {
            (&self.order_lock).acquire();
            (&self.access_lock).acquire();
            (&self.order_lock).release();
608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639
            do (|| {
                unsafe {
                    do task::rekillable {
                        blk(RWLockWriteMode { lock: self, token: NonCopyable::new() })
                    }
                }
            }).finally {
                let writer_or_last_reader;
                // Check if we're releasing from read mode or from write mode.
                let state = unsafe { &mut *self.state.get() };
                if state.read_mode {
                    // Releasing from read mode.
                    let old_count = state.read_count.fetch_sub(1, atomics::Release);
                    assert!(old_count > 0);
                    // Check if other readers remain.
                    if old_count == 1 {
                        // Case 1: Writer downgraded & was the last reader
                        writer_or_last_reader = true;
                        state.read_mode = false;
                    } else {
                        // Case 2: Writer downgraded & was not the last reader
                        writer_or_last_reader = false;
                    }
                } else {
                    // Case 3: Writer did not downgrade
                    writer_or_last_reader = true;
                }
                if writer_or_last_reader {
                    // Nobody left inside; release the "reader cloud" lock.
                    (&self.access_lock).release();
                }
            }
B
Ben Blum 已提交
640 641 642
        }
    }

643
    /// To be called inside of the write_downgrade block.
644 645
    pub fn downgrade<'a>(&self, token: RWLockWriteMode<'a>)
                         -> RWLockReadMode<'a> {
646
        if !borrow::ref_eq(self, token.lock) {
647
            fail!("Can't downgrade() with a different rwlock's write_mode!");
B
Ben Blum 已提交
648 649 650
        }
        unsafe {
            do task::unkillable {
651 652 653 654 655 656 657 658 659
                let state = &mut *self.state.get();
                assert!(!state.read_mode);
                state.read_mode = true;
                // If a reader attempts to enter at this point, both the
                // downgrader and reader will set the mode flag. This is fine.
                let old_count = state.read_count.fetch_add(1, atomics::Release);
                // If another reader was already blocking, we need to hand-off
                // the "reader cloud" access lock to them.
                if old_count != 0 {
B
Ben Blum 已提交
660 661 662
                    // Guaranteed not to let another writer in, because
                    // another reader was holding the order_lock. Hence they
                    // must be the one to get the access_lock (because all
663 664
                    // access_locks are acquired with order_lock held). See
                    // the comment in write_cond for more justification.
B
Ben Blum 已提交
665 666 667 668
                    (&self.access_lock).release();
                }
            }
        }
669
        RWLockReadMode { lock: token.lock, token: NonCopyable::new() }
B
Brian Anderson 已提交
670 671 672
    }
}

B
Ben Blum 已提交
673
/// The "write permission" token used for rwlock.write_downgrade().
674
pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock, priv token: NonCopyable }
675

B
Ben Blum 已提交
676
/// The "read permission" token used for rwlock.write_downgrade().
677 678
pub struct RWLockReadMode<'self> { priv lock: &'self RWLock,
                                   priv token: NonCopyable }
B
Ben Blum 已提交
679

680
impl<'self> RWLockWriteMode<'self> {
B
Ben Blum 已提交
681
    /// Access the pre-downgrade rwlock in write mode.
682
    pub fn write<U>(&self, blk: &fn() -> U) -> U { blk() }
B
Ben Blum 已提交
683
    /// Access the pre-downgrade rwlock in write mode with a condvar.
684
    pub fn write_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
685
        // Need to make the condvar use the order lock when reacquiring the
686
        // access lock. See comment in RWLock::write_cond for why.
687
        blk(&Condvar { sem:        &self.lock.access_lock,
688 689
                       order: Just(&self.lock.order_lock),
                       token: NonCopyable::new() })
B
Ben Blum 已提交
690 691
    }
}
692

693
impl<'self> RWLockReadMode<'self> {
B
Ben Blum 已提交
694
    /// Access the post-downgrade rwlock in read mode.
695
    pub fn read<U>(&self, blk: &fn() -> U) -> U { blk() }
B
Ben Blum 已提交
696 697
}

698 699 700
/****************************************************************************
 * Tests
 ****************************************************************************/
701 702 703

#[cfg(test)]
mod tests {
704 705 706

    use sync::*;

707 708 709 710 711
    use std::cast;
    use std::cell::Cell;
    use std::comm;
    use std::result;
    use std::task;
712

B
Ben Blum 已提交
713 714 715
    /************************************************************************
     * Semaphore tests
     ************************************************************************/
716
    #[test]
717
    fn test_sem_acquire_release() {
718
        let s = ~Semaphore::new(1);
719 720 721 722 723
        s.acquire();
        s.release();
        s.acquire();
    }
    #[test]
724
    fn test_sem_basic() {
725
        let s = ~Semaphore::new(1);
726 727
        do s.access { }
    }
728
    #[test]
729
    fn test_sem_as_mutex() {
730
        let s = ~Semaphore::new(1);
731
        let s2 = ~s.clone();
L
Luqman Aden 已提交
732
        do task::spawn || {
733
            do s2.access {
K
Kevin Ballard 已提交
734
                do 5.times { task::deschedule(); }
735 736 737
            }
        }
        do s.access {
K
Kevin Ballard 已提交
738
            do 5.times { task::deschedule(); }
739 740 741
        }
    }
    #[test]
742
    fn test_sem_as_cvar() {
743
        /* Child waits and parent signals */
744
        let (p,c) = comm::stream();
745
        let s = ~Semaphore::new(0);
746
        let s2 = ~s.clone();
L
Luqman Aden 已提交
747
        do task::spawn || {
B
Ben Blum 已提交
748
            s2.acquire();
749 750
            c.send(());
        }
K
Kevin Ballard 已提交
751
        do 5.times { task::deschedule(); }
B
Ben Blum 已提交
752
        s.release();
753
        let _ = p.recv();
B
Ben Blum 已提交
754

755
        /* Parent waits and child signals */
756
        let (p,c) = comm::stream();
757
        let s = ~Semaphore::new(0);
758
        let s2 = ~s.clone();
L
Luqman Aden 已提交
759
        do task::spawn || {
K
Kevin Ballard 已提交
760
            do 5.times { task::deschedule(); }
B
Ben Blum 已提交
761
            s2.release();
762 763
            let _ = p.recv();
        }
B
Ben Blum 已提交
764
        s.acquire();
765 766 767
        c.send(());
    }
    #[test]
768
    fn test_sem_multi_resource() {
B
Ben Blum 已提交
769 770
        // Parent and child both get in the critical section at the same
        // time, and shake hands.
771
        let s = ~Semaphore::new(2);
B
Ben Blum 已提交
772
        let s2 = ~s.clone();
773 774
        let (p1,c1) = comm::stream();
        let (p2,c2) = comm::stream();
L
Luqman Aden 已提交
775
        do task::spawn || {
B
Ben Blum 已提交
776 777 778 779 780 781 782 783 784 785 786
            do s2.access {
                let _ = p2.recv();
                c1.send(());
            }
        }
        do s.access {
            c2.send(());
            let _ = p1.recv();
        }
    }
    #[test]
787
    fn test_sem_runtime_friendly_blocking() {
B
Ben Blum 已提交
788 789
        // Force the runtime to schedule two threads on the same sched_loop.
        // When one blocks, it should schedule the other one.
790
        do task::spawn_sched(task::SingleThreaded) {
791
            let s = ~Semaphore::new(1);
792
            let s2 = ~s.clone();
793
            let (p,c) = comm::stream();
794
            let child_data = Cell::new((s2, c));
795
            do s.access {
796
                let (s2, c) = child_data.take();
L
Luqman Aden 已提交
797
                do task::spawn || {
798 799 800 801 802
                    c.send(());
                    do s2.access { }
                    c.send(());
                }
                let _ = p.recv(); // wait for child to come alive
K
Kevin Ballard 已提交
803
                do 5.times { task::deschedule(); } // let the child contend
804 805 806 807
            }
            let _ = p.recv(); // wait for child to be done
        }
    }
B
Ben Blum 已提交
808 809 810
    /************************************************************************
     * Mutex tests
     ************************************************************************/
811
    #[test]
812
    fn test_mutex_lock() {
813
        // Unsafely achieve shared state, and do the textbook
814
        // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance.
815
        let (p,c) = comm::stream();
816
        let m = ~Mutex::new();
B
Ben Striegel 已提交
817
        let m2 = m.clone();
818
        let mut sharedstate = ~0;
P
Patrick Walton 已提交
819 820 821 822 823 824 825
        {
            let ptr: *int = &*sharedstate;
            do task::spawn || {
                let sharedstate: &mut int =
                    unsafe { cast::transmute(ptr) };
                access_shared(sharedstate, m2, 10);
                c.send(());
B
Ben Blum 已提交
826

P
Patrick Walton 已提交
827
            }
828
        }
P
Patrick Walton 已提交
829 830 831
        {
            access_shared(sharedstate, m, 10);
            let _ = p.recv();
832

833
            assert_eq!(*sharedstate, 20);
P
Patrick Walton 已提交
834
        }
835

B
Ben Blum 已提交
836
        fn access_shared(sharedstate: &mut int, m: &Mutex, n: uint) {
837
            do n.times {
838
                do m.lock {
839
                    let oldval = *sharedstate;
K
Kevin Ballard 已提交
840
                    task::deschedule();
841 842 843 844 845 846
                    *sharedstate = oldval + 1;
                }
            }
        }
    }
    #[test]
847
    fn test_mutex_cond_wait() {
848
        let m = ~Mutex::new();
849 850 851

        // Child wakes up parent
        do m.lock_cond |cond| {
852
            let m2 = ~m.clone();
L
Luqman Aden 已提交
853
            do task::spawn || {
854 855
                do m2.lock_cond |cond| {
                    let woken = cond.signal();
P
Patrick Walton 已提交
856
                    assert!(woken);
857
                }
858 859 860 861
            }
            cond.wait();
        }
        // Parent wakes up child
862
        let (port,chan) = comm::stream();
863
        let m3 = ~m.clone();
L
Luqman Aden 已提交
864
        do task::spawn || {
865 866 867 868 869 870 871 872
            do m3.lock_cond |cond| {
                chan.send(());
                cond.wait();
                chan.send(());
            }
        }
        let _ = port.recv(); // Wait until child gets in the mutex
        do m.lock_cond |cond| {
873
            let woken = cond.signal();
P
Patrick Walton 已提交
874
            assert!(woken);
875 876 877
        }
        let _ = port.recv(); // Wait until child wakes up
    }
878
    #[cfg(test)]
879
    fn test_mutex_cond_broadcast_helper(num_waiters: uint) {
880
        let m = ~Mutex::new();
881 882
        let mut ports = ~[];

883
        do num_waiters.times {
884
            let mi = ~m.clone();
885
            let (port, chan) = comm::stream();
L
Luqman Aden 已提交
886 887
            ports.push(port);
            do task::spawn || {
888 889 890 891 892 893 894 895 896
                do mi.lock_cond |cond| {
                    chan.send(());
                    cond.wait();
                    chan.send(());
                }
            }
        }

        // wait until all children get in the mutex
D
Daniel Micay 已提交
897
        for port in ports.iter() { let _ = port.recv(); }
898 899
        do m.lock_cond |cond| {
            let num_woken = cond.broadcast();
900
            assert_eq!(num_woken, num_waiters);
901 902
        }
        // wait until all children wake up
D
Daniel Micay 已提交
903
        for port in ports.iter() { let _ = port.recv(); }
904
    }
905
    #[test]
906
    fn test_mutex_cond_broadcast() {
907 908 909
        test_mutex_cond_broadcast_helper(12);
    }
    #[test]
910
    fn test_mutex_cond_broadcast_none() {
911 912 913
        test_mutex_cond_broadcast_helper(0);
    }
    #[test]
914
    fn test_mutex_cond_no_waiter() {
915
        let m = ~Mutex::new();
916
        let m2 = ~m.clone();
L
Luqman Aden 已提交
917
        do task::try || {
918 919 920
            do m.lock_cond |_x| { }
        };
        do m2.lock_cond |cond| {
P
Patrick Walton 已提交
921
            assert!(!cond.signal());
922 923
        }
    }
924
    #[test]
925
    fn test_mutex_killed_simple() {
926
        // Mutex must get automatically unlocked if failed/killed within.
927
        let m = ~Mutex::new();
928 929
        let m2 = ~m.clone();

L
Luqman Aden 已提交
930
        let result: result::Result<(),()> = do task::try || {
931
            do m2.lock {
932
                fail!();
933 934
            }
        };
P
Patrick Walton 已提交
935
        assert!(result.is_err());
936 937 938
        // child task must have finished by the time try returns
        do m.lock { }
    }
B
Brian Anderson 已提交
939
    #[ignore(reason = "linked failure")]
940
    #[test]
941
    fn test_mutex_killed_cond() {
942 943
        // Getting killed during cond wait must not corrupt the mutex while
        // unwinding (e.g. double unlock).
944
        let m = ~Mutex::new();
945 946
        let m2 = ~m.clone();

L
Luqman Aden 已提交
947
        let result: result::Result<(),()> = do task::try || {
948
            let (p,c) = comm::stream();
L
Luqman Aden 已提交
949
            do task::spawn || { // linked
950
                let _ = p.recv(); // wait for sibling to get in the mutex
K
Kevin Ballard 已提交
951
                task::deschedule();
952
                fail!();
953 954 955 956 957 958
            }
            do m2.lock_cond |cond| {
                c.send(()); // tell sibling go ahead
                cond.wait(); // block forever
            }
        };
P
Patrick Walton 已提交
959
        assert!(result.is_err());
960 961
        // child task must have finished by the time try returns
        do m.lock_cond |cond| {
962
            let woken = cond.signal();
P
Patrick Walton 已提交
963
            assert!(!woken);
964 965
        }
    }
B
Brian Anderson 已提交
966
    #[ignore(reason = "linked failure")]
967
    #[test]
968
    fn test_mutex_killed_broadcast() {
969 970
        use std::unstable::finally::Finally;

971
        let m = ~Mutex::new();
972
        let m2 = ~m.clone();
973
        let (p,c) = comm::stream();
974

L
Luqman Aden 已提交
975
        let result: result::Result<(),()> = do task::try || {
976
            let mut sibling_convos = ~[];
977
            do 2.times {
978
                let (p,c) = comm::stream();
979
                let c = Cell::new(c);
L
Luqman Aden 已提交
980
                sibling_convos.push(p);
981 982
                let mi = ~m2.clone();
                // spawn sibling task
983
                do task::spawn { // linked
984
                    do mi.lock_cond |cond| {
985
                        let c = c.take();
986
                        c.send(()); // tell sibling to go ahead
987 988 989 990 991 992 993
                        do (|| {
                            cond.wait(); // block forever
                        }).finally {
                            error!("task unwinding and sending");
                            c.send(());
                            error!("task unwinding and done sending");
                        }
994 995 996
                    }
                }
            }
D
Daniel Micay 已提交
997
            for p in sibling_convos.iter() {
998 999 1000
                let _ = p.recv(); // wait for sibling to get in the mutex
            }
            do m2.lock { }
L
Luqman Aden 已提交
1001
            c.send(sibling_convos); // let parent wait on all children
1002
            fail!();
1003
        };
P
Patrick Walton 已提交
1004
        assert!(result.is_err());
1005
        // child task must have finished by the time try returns
1006
        let r = p.recv();
D
Daniel Micay 已提交
1007
        for p in r.iter() { p.recv(); } // wait on all its siblings
1008 1009
        do m.lock_cond |cond| {
            let woken = cond.broadcast();
1010
            assert_eq!(woken, 0);
1011
        }
1012
    }
1013
    #[test]
1014
    fn test_mutex_cond_signal_on_0() {
1015
        // Tests that signal_on(0) is equivalent to signal().
1016
        let m = ~Mutex::new();
1017 1018
        do m.lock_cond |cond| {
            let m2 = ~m.clone();
L
Luqman Aden 已提交
1019
            do task::spawn || {
1020 1021 1022 1023 1024 1025 1026
                do m2.lock_cond |cond| {
                    cond.signal_on(0);
                }
            }
            cond.wait();
        }
    }
1027
    #[test]
1028
    fn test_mutex_different_conds() {
1029
        let result = do task::try {
1030
            let m = ~Mutex::new_with_condvars(2);
1031
            let m2 = ~m.clone();
1032
            let (p,c) = comm::stream();
L
Luqman Aden 已提交
1033
            do task::spawn || {
1034 1035 1036 1037 1038 1039 1040 1041
                do m2.lock_cond |cond| {
                    c.send(());
                    cond.wait_on(1);
                }
            }
            let _ = p.recv();
            do m.lock_cond |cond| {
                if !cond.signal_on(0) {
1042
                    fail!(); // success; punt sibling awake.
1043 1044 1045
                }
            }
        };
P
Patrick Walton 已提交
1046
        assert!(result.is_err());
1047
    }
1048
    #[test]
1049
    fn test_mutex_no_condvars() {
1050
        let result = do task::try {
1051
            let m = ~Mutex::new_with_condvars(0);
1052 1053
            do m.lock_cond |cond| { cond.wait(); }
        };
P
Patrick Walton 已提交
1054
        assert!(result.is_err());
1055
        let result = do task::try {
1056
            let m = ~Mutex::new_with_condvars(0);
1057 1058
            do m.lock_cond |cond| { cond.signal(); }
        };
P
Patrick Walton 已提交
1059
        assert!(result.is_err());
1060
        let result = do task::try {
1061
            let m = ~Mutex::new_with_condvars(0);
1062 1063
            do m.lock_cond |cond| { cond.broadcast(); }
        };
P
Patrick Walton 已提交
1064
        assert!(result.is_err());
1065
    }
B
Ben Blum 已提交
1066 1067 1068 1069
    /************************************************************************
     * Reader/writer lock tests
     ************************************************************************/
    #[cfg(test)]
1070
    pub enum RWLockMode { Read, Write, Downgrade, DowngradeRead }
B
Ben Blum 已提交
1071
    #[cfg(test)]
1072
    fn lock_rwlock_in_mode(x: &RWLock, mode: RWLockMode, blk: &fn()) {
B
Ben Blum 已提交
1073
        match mode {
B
Ben Blum 已提交
1074 1075 1076
            Read => x.read(blk),
            Write => x.write(blk),
            Downgrade =>
1077
                do x.write_downgrade |mode| {
1078
                    do mode.write { blk() };
1079
                },
B
Ben Blum 已提交
1080
            DowngradeRead =>
1081
                do x.write_downgrade |mode| {
L
Luqman Aden 已提交
1082
                    let mode = x.downgrade(mode);
1083
                    do mode.read { blk() };
1084
                },
B
Ben Blum 已提交
1085
        }
B
Ben Blum 已提交
1086
    }
1087
    #[cfg(test)]
1088 1089 1090
    fn test_rwlock_exclusion(x: ~RWLock,
                                 mode1: RWLockMode,
                                 mode2: RWLockMode) {
1091 1092
        // Test mutual exclusion between readers and writers. Just like the
        // mutex mutual exclusion test, a ways above.
1093
        let (p,c) = comm::stream();
B
Ben Striegel 已提交
1094
        let x2 = (*x).clone();
1095
        let mut sharedstate = ~0;
P
Patrick Walton 已提交
1096 1097 1098 1099 1100 1101 1102 1103
        {
            let ptr: *int = &*sharedstate;
            do task::spawn || {
                let sharedstate: &mut int =
                    unsafe { cast::transmute(ptr) };
                access_shared(sharedstate, &x2, mode1, 10);
                c.send(());
            }
1104
        }
P
Patrick Walton 已提交
1105 1106 1107
        {
            access_shared(sharedstate, x, mode2, 10);
            let _ = p.recv();
1108

1109
            assert_eq!(*sharedstate, 20);
P
Patrick Walton 已提交
1110
        }
1111

1112
        fn access_shared(sharedstate: &mut int, x: &RWLock, mode: RWLockMode,
1113
                         n: uint) {
1114
            do n.times {
B
Ben Blum 已提交
1115
                do lock_rwlock_in_mode(x, mode) {
1116
                    let oldval = *sharedstate;
K
Kevin Ballard 已提交
1117
                    task::deschedule();
1118 1119 1120 1121 1122 1123
                    *sharedstate = oldval + 1;
                }
            }
        }
    }
    #[test]
1124
    fn test_rwlock_readers_wont_modify_the_data() {
1125 1126 1127 1128
        test_rwlock_exclusion(~RWLock::new(), Read, Write);
        test_rwlock_exclusion(~RWLock::new(), Write, Read);
        test_rwlock_exclusion(~RWLock::new(), Read, Downgrade);
        test_rwlock_exclusion(~RWLock::new(), Downgrade, Read);
1129 1130
    }
    #[test]
1131
    fn test_rwlock_writers_and_writers() {
1132 1133 1134 1135
        test_rwlock_exclusion(~RWLock::new(), Write, Write);
        test_rwlock_exclusion(~RWLock::new(), Write, Downgrade);
        test_rwlock_exclusion(~RWLock::new(), Downgrade, Write);
        test_rwlock_exclusion(~RWLock::new(), Downgrade, Downgrade);
1136
    }
B
Ben Blum 已提交
1137
    #[cfg(test)]
1138 1139 1140
    fn test_rwlock_handshake(x: ~RWLock,
                                 mode1: RWLockMode,
                                 mode2: RWLockMode,
1141
                                 make_mode2_go_first: bool) {
1142
        // Much like sem_multi_resource.
B
Ben Striegel 已提交
1143
        let x2 = (*x).clone();
1144 1145
        let (p1,c1) = comm::stream();
        let (p2,c2) = comm::stream();
L
Luqman Aden 已提交
1146
        do task::spawn || {
B
Ben Blum 已提交
1147 1148 1149
            if !make_mode2_go_first {
                let _ = p2.recv(); // parent sends to us once it locks, or ...
            }
B
Ben Striegel 已提交
1150
            do lock_rwlock_in_mode(&x2, mode2) {
B
Ben Blum 已提交
1151 1152 1153
                if make_mode2_go_first {
                    c1.send(()); // ... we send to it once we lock
                }
1154 1155 1156 1157
                let _ = p2.recv();
                c1.send(());
            }
        }
B
Ben Blum 已提交
1158 1159 1160 1161 1162 1163 1164
        if make_mode2_go_first {
            let _ = p1.recv(); // child sends to us once it locks, or ...
        }
        do lock_rwlock_in_mode(x, mode1) {
            if !make_mode2_go_first {
                c2.send(()); // ... we send to it once we lock
            }
1165 1166 1167
            c2.send(());
            let _ = p1.recv();
        }
B
Ben Blum 已提交
1168
    }
1169
    #[test]
1170
    fn test_rwlock_readers_and_readers() {
1171
        test_rwlock_handshake(~RWLock::new(), Read, Read, false);
B
Ben Blum 已提交
1172 1173
        // The downgrader needs to get in before the reader gets in, otherwise
        // they cannot end up reading at the same time.
1174 1175
        test_rwlock_handshake(~RWLock::new(), DowngradeRead, Read, false);
        test_rwlock_handshake(~RWLock::new(), Read, DowngradeRead, true);
B
Ben Blum 已提交
1176 1177 1178
        // Two downgrade_reads can never both end up reading at the same time.
    }
    #[test]
1179
    fn test_rwlock_downgrade_unlock() {
B
Ben Blum 已提交
1180
        // Tests that downgrade can unlock the lock in both modes
1181
        let x = ~RWLock::new();
B
Ben Blum 已提交
1182
        do lock_rwlock_in_mode(x, Downgrade) { }
L
Luqman Aden 已提交
1183
        test_rwlock_handshake(x, Read, Read, false);
1184
        let y = ~RWLock::new();
B
Ben Blum 已提交
1185
        do lock_rwlock_in_mode(y, DowngradeRead) { }
L
Luqman Aden 已提交
1186
        test_rwlock_exclusion(y, Write, Write);
B
Ben Blum 已提交
1187 1188
    }
    #[test]
1189
    fn test_rwlock_read_recursive() {
1190
        let x = ~RWLock::new();
B
Ben Blum 已提交
1191 1192 1193
        do x.read { do x.read { } }
    }
    #[test]
1194
    fn test_rwlock_cond_wait() {
1195
        // As test_mutex_cond_wait above.
1196
        let x = ~RWLock::new();
1197 1198 1199

        // Child wakes up parent
        do x.write_cond |cond| {
B
Ben Striegel 已提交
1200
            let x2 = (*x).clone();
L
Luqman Aden 已提交
1201
            do task::spawn || {
1202 1203
                do x2.write_cond |cond| {
                    let woken = cond.signal();
P
Patrick Walton 已提交
1204
                    assert!(woken);
1205 1206 1207 1208 1209
                }
            }
            cond.wait();
        }
        // Parent wakes up child
1210
        let (port,chan) = comm::stream();
B
Ben Striegel 已提交
1211
        let x3 = (*x).clone();
L
Luqman Aden 已提交
1212
        do task::spawn || {
1213 1214 1215 1216 1217 1218 1219 1220 1221 1222
            do x3.write_cond |cond| {
                chan.send(());
                cond.wait();
                chan.send(());
            }
        }
        let _ = port.recv(); // Wait until child gets in the rwlock
        do x.read { } // Must be able to get in as a reader in the meantime
        do x.write_cond |cond| { // Or as another writer
            let woken = cond.signal();
P
Patrick Walton 已提交
1223
            assert!(woken);
1224 1225 1226 1227
        }
        let _ = port.recv(); // Wait until child wakes up
        do x.read { } // Just for good measure
    }
B
Ben Blum 已提交
1228
    #[cfg(test)]
1229
    fn test_rwlock_cond_broadcast_helper(num_waiters: uint,
1230 1231
                                             dg1: bool,
                                             dg2: bool) {
B
Ben Blum 已提交
1232
        // Much like the mutex broadcast test. Downgrade-enabled.
1233
        fn lock_cond(x: &RWLock, downgrade: bool, blk: &fn(c: &Condvar)) {
B
Ben Blum 已提交
1234
            if downgrade {
1235
                do x.write_downgrade |mode| {
1236
                    do mode.write_cond |c| { blk(c) }
1237
                }
B
Ben Blum 已提交
1238
            } else {
1239
                do x.write_cond |c| { blk(c) }
B
Ben Blum 已提交
1240 1241
            }
        }
1242
        let x = ~RWLock::new();
B
Ben Blum 已提交
1243 1244
        let mut ports = ~[];

1245
        do num_waiters.times {
B
Ben Striegel 已提交
1246
            let xi = (*x).clone();
1247
            let (port, chan) = comm::stream();
L
Luqman Aden 已提交
1248 1249
            ports.push(port);
            do task::spawn || {
B
Ben Striegel 已提交
1250
                do lock_cond(&xi, dg1) |cond| {
B
Ben Blum 已提交
1251 1252 1253 1254 1255 1256 1257 1258
                    chan.send(());
                    cond.wait();
                    chan.send(());
                }
            }
        }

        // wait until all children get in the mutex
D
Daniel Micay 已提交
1259
        for port in ports.iter() { let _ = port.recv(); }
B
Ben Blum 已提交
1260 1261
        do lock_cond(x, dg2) |cond| {
            let num_woken = cond.broadcast();
1262
            assert_eq!(num_woken, num_waiters);
B
Ben Blum 已提交
1263 1264
        }
        // wait until all children wake up
D
Daniel Micay 已提交
1265
        for port in ports.iter() { let _ = port.recv(); }
B
Ben Blum 已提交
1266 1267
    }
    #[test]
1268
    fn test_rwlock_cond_broadcast() {
B
Ben Blum 已提交
1269 1270 1271 1272 1273 1274 1275 1276 1277
        test_rwlock_cond_broadcast_helper(0, true, true);
        test_rwlock_cond_broadcast_helper(0, true, false);
        test_rwlock_cond_broadcast_helper(0, false, true);
        test_rwlock_cond_broadcast_helper(0, false, false);
        test_rwlock_cond_broadcast_helper(12, true, true);
        test_rwlock_cond_broadcast_helper(12, true, false);
        test_rwlock_cond_broadcast_helper(12, false, true);
        test_rwlock_cond_broadcast_helper(12, false, false);
    }
1278
    #[cfg(test)]
1279
    fn rwlock_kill_helper(mode1: RWLockMode, mode2: RWLockMode) {
B
Ben Blum 已提交
1280
        // Mutex must get automatically unlocked if failed/killed within.
1281
        let x = ~RWLock::new();
B
Ben Striegel 已提交
1282
        let x2 = (*x).clone();
1283

L
Luqman Aden 已提交
1284
        let result: result::Result<(),()> = do task::try || {
B
Ben Striegel 已提交
1285
            do lock_rwlock_in_mode(&x2, mode1) {
1286
                fail!();
B
Ben Blum 已提交
1287 1288
            }
        };
P
Patrick Walton 已提交
1289
        assert!(result.is_err());
B
Ben Blum 已提交
1290
        // child task must have finished by the time try returns
B
Ben Blum 已提交
1291
        do lock_rwlock_in_mode(x, mode2) { }
1292
    }
1293
    #[test]
1294
    fn test_rwlock_reader_killed_writer() {
1295 1296
        rwlock_kill_helper(Read, Write);
    }
1297
    #[test]
1298
    fn test_rwlock_writer_killed_reader() {
1299 1300
        rwlock_kill_helper(Write,Read );
    }
1301
    #[test]
1302
    fn test_rwlock_reader_killed_reader() {
1303 1304
        rwlock_kill_helper(Read, Read );
    }
1305
    #[test]
1306
    fn test_rwlock_writer_killed_writer() {
1307 1308
        rwlock_kill_helper(Write,Write);
    }
1309
    #[test]
1310
    fn test_rwlock_kill_downgrader() {
B
Ben Blum 已提交
1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322
        rwlock_kill_helper(Downgrade, Read);
        rwlock_kill_helper(Read, Downgrade);
        rwlock_kill_helper(Downgrade, Write);
        rwlock_kill_helper(Write, Downgrade);
        rwlock_kill_helper(DowngradeRead, Read);
        rwlock_kill_helper(Read, DowngradeRead);
        rwlock_kill_helper(DowngradeRead, Write);
        rwlock_kill_helper(Write, DowngradeRead);
        rwlock_kill_helper(DowngradeRead, Downgrade);
        rwlock_kill_helper(DowngradeRead, Downgrade);
        rwlock_kill_helper(Downgrade, DowngradeRead);
        rwlock_kill_helper(Downgrade, DowngradeRead);
B
Ben Blum 已提交
1323
    }
1324
    #[test] #[should_fail]
1325
    fn test_rwlock_downgrade_cant_swap() {
B
Ben Blum 已提交
1326
        // Tests that you can't downgrade with a different rwlock's token.
1327 1328
        let x = ~RWLock::new();
        let y = ~RWLock::new();
B
Ben Blum 已提交
1329
        do x.write_downgrade |xwrite| {
L
Luqman Aden 已提交
1330
            let mut xopt = Some(xwrite);
B
Ben Blum 已提交
1331
            do y.write_downgrade |_ywrite| {
1332
                y.downgrade(xopt.take_unwrap());
1333
                error!("oops, y.downgrade(x) should have failed!");
B
Ben Blum 已提交
1334 1335 1336
            }
        }
    }
1337
}