sync.rs 49.0 KB
Newer Older
1
// Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
2 3 4 5 6 7 8 9 10
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

11 12 13 14 15 16 17
/**
 * The concurrency primitives you know and love.
 *
 * Maybe once we have a "core exports x only to std" mechanism, these can be
 * in std.
 */

18 19
use core::prelude::*;

20
use core::borrow;
21
use core::comm;
22
use core::task;
23 24
use core::unstable::sync::{Exclusive, exclusive, UnsafeAtomicRcBox};
use core::unstable::atomics;
25
use core::util;
26

27 28 29 30
/****************************************************************************
 * Internals
 ****************************************************************************/

31 32
// Each waiting task receives on one of these.
#[doc(hidden)]
33
type WaitEnd = comm::PortOne<()>;
34
#[doc(hidden)]
35
type SignalEnd = comm::ChanOne<()>;
36
// A doubly-ended queue of waiting tasks.
37
#[doc(hidden)]
38 39
struct Waitqueue { head: comm::Port<SignalEnd>,
                   tail: comm::Chan<SignalEnd> }
40

41
#[doc(hidden)]
B
Ben Blum 已提交
42
fn new_waitqueue() -> Waitqueue {
43
    let (block_head, block_tail) = comm::stream();
L
Luqman Aden 已提交
44
    Waitqueue { head: block_head, tail: block_tail }
B
Ben Blum 已提交
45 46
}

47
// Signals one live task from the queue.
48
#[doc(hidden)]
B
Ben Blum 已提交
49
fn signal_waitqueue(q: &Waitqueue) -> bool {
50 51 52 53
    // The peek is mandatory to make sure recv doesn't block.
    if q.head.peek() {
        // Pop and send a wakeup signal. If the waiter was killed, its port
        // will have closed. Keep trying until we get a live task.
54
        if comm::try_send_one(q.head.recv(), ()) {
55 56 57 58 59 60 61 62 63
            true
        } else {
            signal_waitqueue(q)
        }
    } else {
        false
    }
}

64
#[doc(hidden)]
B
Ben Blum 已提交
65
fn broadcast_waitqueue(q: &Waitqueue) -> uint {
66 67
    let mut count = 0;
    while q.head.peek() {
68
        if comm::try_send_one(q.head.recv(), ()) {
69 70 71 72 73 74 75
            count += 1;
        }
    }
    count
}

// The building-block used to make semaphores, mutexes, and rwlocks.
76
#[doc(hidden)]
B
Ben Blum 已提交
77
struct SemInner<Q> {
78
    count: int,
79
    waiters:   Waitqueue,
80 81
    // Can be either unit or another waitqueue. Some sems shouldn't come with
    // a condition variable attached, others should.
82
    blocked:   Q
B
Ben Blum 已提交
83
}
84

85
#[doc(hidden)]
86
struct Sem<Q>(Exclusive<SemInner<Q>>);
87

88
#[doc(hidden)]
89
fn new_sem<Q:Send>(count: int, q: Q) -> Sem<Q> {
B
Ben Blum 已提交
90
    Sem(exclusive(SemInner {
91
        count: count, waiters: new_waitqueue(), blocked: q }))
92
}
93
#[doc(hidden)]
B
Ben Blum 已提交
94
fn new_sem_and_signal(count: int, num_condvars: uint)
B
Ben Striegel 已提交
95
        -> Sem<~[Waitqueue]> {
96
    let mut queues = ~[];
97
    for num_condvars.times {
98
        queues.push(new_waitqueue());
99
    }
B
Ben Striegel 已提交
100
    new_sem(count, queues)
101 102
}

103
#[doc(hidden)]
104
impl<Q:Send> Sem<Q> {
105
    pub fn acquire(&self) {
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
        unsafe {
            let mut waiter_nobe = None;
            do (**self).with |state| {
                state.count -= 1;
                if state.count < 0 {
                    // Create waiter nobe.
                    let (WaitEnd, SignalEnd) = comm::oneshot();
                    // Tell outer scope we need to block.
                    waiter_nobe = Some(WaitEnd);
                    // Enqueue ourself.
                    state.waiters.tail.send(SignalEnd);
                }
            }
            // Uncomment if you wish to test for sem races. Not valgrind-friendly.
            /* for 1000.times { task::yield(); } */
            // Need to wait outside the exclusive.
            if waiter_nobe.is_some() {
                let _ = comm::recv_one(waiter_nobe.unwrap());
124 125 126
            }
        }
    }
127 128

    pub fn release(&self) {
129 130 131 132 133 134
        unsafe {
            do (**self).with |state| {
                state.count += 1;
                if state.count <= 0 {
                    signal_waitqueue(&state.waiters);
                }
135 136 137
            }
        }
    }
138
}
B
Ben Blum 已提交
139
// FIXME(#3154) move both copies of this into Sem<Q>, and unify the 2 structs
140
#[doc(hidden)]
141 142
impl Sem<()> {
    pub fn access<U>(&self, blk: &fn() -> U) -> U {
B
Brian Anderson 已提交
143
        let mut release = None;
144 145 146
        unsafe {
            do task::unkillable {
                self.acquire();
147
                release = Some(SemRelease(self));
148 149
            }
        }
150 151 152
        blk()
    }
}
153

154
#[doc(hidden)]
155
impl Sem<~[Waitqueue]> {
156
    pub fn access_waitqueue<U>(&self, blk: &fn() -> U) -> U {
B
Brian Anderson 已提交
157
        let mut release = None;
158 159 160
        unsafe {
            do task::unkillable {
                self.acquire();
161
                release = Some(SemAndSignalRelease(self));
162 163
            }
        }
164 165 166
        blk()
    }
}
167

T
Tim Chevalier 已提交
168
// FIXME(#3588) should go inside of access()
169
#[doc(hidden)]
170
type SemRelease<'self> = SemReleaseGeneric<'self, ()>;
171
#[doc(hidden)]
172
type SemAndSignalRelease<'self> = SemReleaseGeneric<'self, ~[Waitqueue]>;
173
#[doc(hidden)]
174
struct SemReleaseGeneric<'self, Q> { sem: &'self Sem<Q> }
175

176
#[doc(hidden)]
177
#[unsafe_destructor]
178
impl<'self, Q:Send> Drop for SemReleaseGeneric<'self, Q> {
L
Luqman Aden 已提交
179
    fn drop(&self) {
180
        self.sem.release();
181
    }
182
}
B
Brian Anderson 已提交
183

184
#[doc(hidden)]
185
fn SemRelease<'r>(sem: &'r Sem<()>) -> SemRelease<'r> {
T
Tim Chevalier 已提交
186
    SemReleaseGeneric {
B
Brian Anderson 已提交
187 188 189 190
        sem: sem
    }
}

191
#[doc(hidden)]
192 193
fn SemAndSignalRelease<'r>(sem: &'r Sem<~[Waitqueue]>)
                        -> SemAndSignalRelease<'r> {
T
Tim Chevalier 已提交
194
    SemReleaseGeneric {
B
Brian Anderson 已提交
195 196 197 198
        sem: sem
    }
}

199 200 201 202 203 204 205 206
// FIXME(#3598): Want to use an Option down below, but we need a custom enum
// that's not polymorphic to get around the fact that lifetimes are invariant
// inside of type parameters.
enum ReacquireOrderLock<'self> {
    Nothing, // c.c
    Just(&'self Semaphore),
}

207
/// A mechanism for atomic-unlock-and-deschedule blocking and signalling.
208 209 210 211 212 213 214 215 216 217 218 219
pub struct Condvar<'self> {
    // The 'Sem' object associated with this condvar. This is the one that's
    // atomically-unlocked-and-descheduled upon and reacquired during wakeup.
    priv sem: &'self Sem<~[Waitqueue]>,
    // This is (can be) an extra semaphore which is held around the reacquire
    // operation on the first one. This is only used in cvars associated with
    // rwlocks, and is needed to ensure that, when a downgrader is trying to
    // hand off the access lock (which would be the first field, here), a 2nd
    // writer waking up from a cvar wait can't race with a reader to steal it,
    // See the comment in write_cond for more detail.
    priv order: ReacquireOrderLock<'self>,
}
220

221
#[unsafe_destructor]
L
Luqman Aden 已提交
222
impl<'self> Drop for Condvar<'self> { fn drop(&self) {} }
223

224
impl<'self> Condvar<'self> {
225 226 227 228 229 230 231 232
    /**
     * Atomically drop the associated lock, and block until a signal is sent.
     *
     * # Failure
     * A task which is killed (i.e., by linked failure with another task)
     * while waiting on a condition variable will wake up, fail, and unlock
     * the associated lock as it unwinds.
     */
233
    pub fn wait(&self) { self.wait_on(0) }
234

235 236 237 238 239 240 241 242 243 244 245
    /**
     * As wait(), but can specify which of multiple condition variables to
     * wait on. Only a signal_on() or broadcast_on() with the same condvar_id
     * will wake this thread.
     *
     * The associated lock must have been initialised with an appropriate
     * number of condvars. The condvar_id must be between 0 and num_condvars-1
     * or else this call will fail.
     *
     * wait() is equivalent to wait_on(0).
     */
246
    pub fn wait_on(&self, condvar_id: uint) {
247
        // Create waiter nobe.
248
        let (WaitEnd, SignalEnd) = comm::oneshot();
L
Luqman Aden 已提交
249 250
        let mut WaitEnd   = Some(WaitEnd);
        let mut SignalEnd = Some(SignalEnd);
B
Brian Anderson 已提交
251 252
        let mut reacquire = None;
        let mut out_of_bounds = None;
253
        unsafe {
254
            do task::unkillable {
255 256
                // Release lock, 'atomically' enqueuing ourselves in so doing.
                do (**self.sem).with |state| {
Y
Youngmin Yoo 已提交
257
                    if condvar_id < state.blocked.len() {
258 259 260 261 262 263
                        // Drop the lock.
                        state.count += 1;
                        if state.count <= 0 {
                            signal_waitqueue(&state.waiters);
                        }
                        // Enqueue ourself to be woken up by a signaller.
264
                        let SignalEnd = SignalEnd.swap_unwrap();
L
Luqman Aden 已提交
265
                        state.blocked[condvar_id].tail.send(SignalEnd);
266
                    } else {
Y
Youngmin Yoo 已提交
267
                        out_of_bounds = Some(state.blocked.len());
268 269 270
                    }
                }

271 272 273 274
                // If yield checks start getting inserted anywhere, we can be
                // killed before or after enqueueing. Deciding whether to
                // unkillably reacquire the lock needs to happen atomically
                // wrt enqueuing.
275
                if out_of_bounds.is_none() {
276 277
                    reacquire = Some(CondvarReacquire { sem:   self.sem,
                                                        order: self.order });
278 279 280
                }
            }
        }
281 282 283 284
        do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") {
            // Unconditionally "block". (Might not actually block if a
            // signaller already sent -- I mean 'unconditionally' in contrast
            // with acquire().)
285
            let _ = comm::recv_one(WaitEnd.swap_unwrap());
286
        }
B
Ben Blum 已提交
287 288 289 290

        // This is needed for a failing condition variable to reacquire the
        // mutex during unwinding. As long as the wrapper (mutex, etc) is
        // bounded in when it gets released, this shouldn't hang forever.
291
        struct CondvarReacquire<'self> {
292
            sem: &'self Sem<~[Waitqueue]>,
293
            order: ReacquireOrderLock<'self>,
294 295
        }

296
        #[unsafe_destructor]
297
        impl<'self> Drop for CondvarReacquire<'self> {
L
Luqman Aden 已提交
298
            fn drop(&self) {
299 300 301
                unsafe {
                    // Needs to succeed, instead of itself dying.
                    do task::unkillable {
302 303 304 305 306 307 308 309
                        match self.order {
                            Just(lock) => do lock.access {
                                self.sem.acquire();
                            },
                            Nothing => {
                                self.sem.acquire();
                            },
                        }
310
                    }
B
Ben Blum 已提交
311 312 313
                }
            }
        }
314 315 316
    }

    /// Wake up a blocked task. Returns false if there was no blocked task.
317
    pub fn signal(&self) -> bool { self.signal_on(0) }
318

319
    /// As signal, but with a specified condvar_id. See wait_on.
320
    pub fn signal_on(&self, condvar_id: uint) -> bool {
321 322 323 324 325 326 327 328 329 330 331 332
        unsafe {
            let mut out_of_bounds = None;
            let mut result = false;
            do (**self.sem).with |state| {
                if condvar_id < state.blocked.len() {
                    result = signal_waitqueue(&state.blocked[condvar_id]);
                } else {
                    out_of_bounds = Some(state.blocked.len());
                }
            }
            do check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()") {
                result
333
            }
334
        }
335 336 337
    }

    /// Wake up all blocked tasks. Returns the number of tasks woken.
338
    pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
339

340
    /// As broadcast, but with a specified condvar_id. See wait_on.
341
    pub fn broadcast_on(&self, condvar_id: uint) -> uint {
B
Brian Anderson 已提交
342 343
        let mut out_of_bounds = None;
        let mut queue = None;
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
        unsafe {
            do (**self.sem).with |state| {
                if condvar_id < state.blocked.len() {
                    // To avoid :broadcast_heavy, we make a new waitqueue,
                    // swap it out with the old one, and broadcast on the
                    // old one outside of the little-lock.
                    queue = Some(util::replace(&mut state.blocked[condvar_id],
                                               new_waitqueue()));
                } else {
                    out_of_bounds = Some(state.blocked.len());
                }
            }
            do check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()") {
                let queue = queue.swap_unwrap();
                broadcast_waitqueue(&queue)
359
            }
360 361 362 363 364 365
        }
    }
}

// Checks whether a condvar ID was out of bounds, and fails if so, or does
// something else next on success.
366
#[inline]
367
#[doc(hidden)]
B
Brian Anderson 已提交
368
fn check_cvar_bounds<U>(out_of_bounds: Option<uint>, id: uint, act: &str,
369
                        blk: &fn() -> U) -> U {
370
    match out_of_bounds {
B
Brian Anderson 已提交
371
        Some(0) =>
M
Marvin Löbel 已提交
372
            fail!("%s with illegal ID %u - this lock has no condvars!", act, id),
B
Brian Anderson 已提交
373
        Some(length) =>
M
Marvin Löbel 已提交
374
            fail!("%s with illegal ID %u - ID must be less than %u", act, id, length),
B
Brian Anderson 已提交
375
        None => blk()
376 377 378
    }
}

379
#[doc(hidden)]
380
impl Sem<~[Waitqueue]> {
381 382
    // The only other places that condvars get built are rwlock.write_cond()
    // and rwlock_write_mode.
383
    pub fn access_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
384
        do self.access_waitqueue {
385 386
            blk(&Condvar { sem: self, order: Nothing })
        }
387 388 389 390 391 392 393 394
    }
}

/****************************************************************************
 * Semaphores
 ****************************************************************************/

/// A counting, blocking, bounded-waiting semaphore.
395
struct Semaphore { priv sem: Sem<()> }
396 397

/// Create a new semaphore with the specified count.
398
pub fn semaphore(count: int) -> Semaphore {
B
Ben Blum 已提交
399
    Semaphore { sem: new_sem(count, ()) }
B
Ben Blum 已提交
400
}
401

402
impl Clone for Semaphore {
403
    /// Create a new handle to the semaphore.
B
Brian Anderson 已提交
404 405 406 407
    fn clone(&self) -> Semaphore {
        Semaphore { sem: Sem((*self.sem).clone()) }
    }
}
408

409
impl Semaphore {
410 411 412 413
    /**
     * Acquire a resource represented by the semaphore. Blocks if necessary
     * until resource(s) become available.
     */
414
    pub fn acquire(&self) { (&self.sem).acquire() }
415 416 417

    /**
     * Release a held resource represented by the semaphore. Wakes a blocked
418
     * contending task, if any exist. Won't block the caller.
419
     */
420
    pub fn release(&self) { (&self.sem).release() }
421 422

    /// Run a function with ownership of one of the semaphore's resources.
423
    pub fn access<U>(&self, blk: &fn() -> U) -> U { (&self.sem).access(blk) }
424 425 426 427 428 429 430 431 432
}

/****************************************************************************
 * Mutexes
 ****************************************************************************/

/**
 * A blocking, bounded-waiting, mutual exclusion lock with an associated
 * FIFO condition variable.
433 434 435 436
 *
 * # Failure
 * A task which fails while holding a mutex will unlock the mutex as it
 * unwinds.
437
 */
438
pub struct Mutex { priv sem: Sem<~[Waitqueue]> }
439

440
/// Create a new mutex, with one associated condvar.
441
pub fn Mutex() -> Mutex { mutex_with_condvars(1) }
442 443 444 445 446 447
/**
 * Create a new mutex, with a specified number of associated condvars. This
 * will allow calling wait_on/signal_on/broadcast_on with condvar IDs between
 * 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be allowed but
 * any operations on the condvar will fail.)
 */
448
pub fn mutex_with_condvars(num_condvars: uint) -> Mutex {
B
Ben Blum 已提交
449
    Mutex { sem: new_sem_and_signal(1, num_condvars) }
450
}
451

452
impl Clone for Mutex {
453
    /// Create a new handle to the mutex.
B
Brian Anderson 已提交
454 455
    fn clone(&self) -> Mutex { Mutex { sem: Sem((*self.sem).clone()) } }
}
456

457
impl Mutex {
458
    /// Run a function with ownership of the mutex.
459 460 461
    pub fn lock<U>(&self, blk: &fn() -> U) -> U {
        (&self.sem).access_waitqueue(blk)
    }
462 463

    /// Run a function with ownership of the mutex and a handle to a condvar.
464
    pub fn lock_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
B
Ben Blum 已提交
465
        (&self.sem).access_cond(blk)
466 467 468 469 470 471 472
    }
}

/****************************************************************************
 * Reader-writer locks
 ****************************************************************************/

473 474
// NB: Wikipedia - Readers-writers_problem#The_third_readers-writers_problem

475
#[doc(hidden)]
B
Ben Blum 已提交
476
struct RWlockInner {
477 478 479 480 481 482 483 484 485
    // You might ask, "Why don't you need to use an atomic for the mode flag?"
    // This flag affects the behaviour of readers (for plain readers, they
    // assert on it; for downgraders, they use it to decide which mode to
    // unlock for). Consider that the flag is only unset when the very last
    // reader exits; therefore, it can never be unset during a reader/reader
    // (or reader/downgrader) race.
    // By the way, if we didn't care about the assert in the read unlock path,
    // we could instead store the mode flag in write_downgrade's stack frame,
    // and have the downgrade tokens store a borrowed pointer to it.
486
    read_mode:  bool,
487 488 489 490 491 492 493
    // The only way the count flag is ever accessed is with xadd. Since it is
    // a read-modify-write operation, multiple xadds on different cores will
    // always be consistent with respect to each other, so a monotonic/relaxed
    // consistency ordering suffices (i.e., no extra barriers are needed).
    // FIXME(#6598): The atomics module has no relaxed ordering flag, so I use
    // acquire/release orderings superfluously. Change these someday.
    read_count: atomics::AtomicUint,
494 495
}

496 497 498 499 500 501 502
/**
 * A blocking, no-starvation, reader-writer lock with an associated condvar.
 *
 * # Failure
 * A task which fails while holding an rwlock will unlock the rwlock as it
 * unwinds.
 */
503
pub struct RWlock {
504
    priv order_lock:  Semaphore,
B
Ben Striegel 已提交
505
    priv access_lock: Sem<~[Waitqueue]>,
506
    priv state:       UnsafeAtomicRcBox<RWlockInner>,
507 508
}

509
/// Create a new rwlock, with one associated condvar.
510
pub fn RWlock() -> RWlock { rwlock_with_condvars(1) }
511 512 513 514 515

/**
 * Create a new rwlock, with a specified number of associated condvars.
 * Similar to mutex_with_condvars.
 */
516
pub fn rwlock_with_condvars(num_condvars: uint) -> RWlock {
517 518 519 520 521
    let state = UnsafeAtomicRcBox::new(RWlockInner {
        read_mode:  false,
        read_count: atomics::AtomicUint::new(0),
    });
    RWlock { order_lock:  semaphore(1),
522
             access_lock: new_sem_and_signal(1, num_condvars),
523
             state:       state, }
524 525
}

526
impl RWlock {
B
Ben Blum 已提交
527
    /// Create a new handle to the rwlock.
528
    pub fn clone(&self) -> RWlock {
B
Ben Blum 已提交
529 530
        RWlock { order_lock:  (&(self.order_lock)).clone(),
                 access_lock: Sem((*self.access_lock).clone()),
531 532 533 534 535 536 537
                 state:       self.state.clone() }
    }

    /**
     * Run a function with the rwlock in read mode. Calls to 'read' from other
     * tasks may run concurrently with this one.
     */
538
    pub fn read<U>(&self, blk: &fn() -> U) -> U {
B
Brian Anderson 已提交
539
        let mut release = None;
B
Ben Blum 已提交
540 541 542
        unsafe {
            do task::unkillable {
                do (&self.order_lock).access {
543 544 545
                    let state = &mut *self.state.get();
                    let old_count = state.read_count.fetch_add(1, atomics::Acquire);
                    if old_count == 0 {
B
Ben Blum 已提交
546
                        (&self.access_lock).acquire();
547
                        state.read_mode = true;
B
Ben Blum 已提交
548 549
                    }
                }
B
Ben Blum 已提交
550
                release = Some(RWlockReleaseRead(self));
551 552
            }
        }
B
Ben Blum 已提交
553
        blk()
554 555 556 557 558 559
    }

    /**
     * Run a function with the rwlock in write mode. No calls to 'read' or
     * 'write' from other tasks will run concurrently with this one.
     */
560
    pub fn write<U>(&self, blk: &fn() -> U) -> U {
B
Ben Blum 已提交
561 562 563
        unsafe {
            do task::unkillable {
                (&self.order_lock).acquire();
564
                do (&self.access_lock).access_waitqueue {
565 566 567
                    (&self.order_lock).release();
                    task::rekillable(blk)
                }
B
Ben Blum 已提交
568
            }
569 570 571 572 573 574
        }
    }

    /**
     * As write(), but also with a handle to a condvar. Waiting on this
     * condvar will allow readers and writers alike to take the rwlock before
575 576
     * the waiting task is signalled. (Note: a writer that waited and then
     * was signalled might reacquire the lock before other waiting writers.)
577
     */
578
    pub fn write_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602
        // It's important to thread our order lock into the condvar, so that
        // when a cond.wait() wakes up, it uses it while reacquiring the
        // access lock. If we permitted a waking-up writer to "cut in line",
        // there could arise a subtle race when a downgrader attempts to hand
        // off the reader cloud lock to a waiting reader. This race is tested
        // in arc.rs (test_rw_write_cond_downgrade_read_race) and looks like:
        // T1 (writer)              T2 (downgrader)             T3 (reader)
        // [in cond.wait()]
        //                          [locks for writing]
        //                          [holds access_lock]
        // [is signalled, perhaps by
        //  downgrader or a 4th thread]
        // tries to lock access(!)
        //                                                      lock order_lock
        //                                                      xadd read_count[0->1]
        //                                                      tries to lock access
        //                          [downgrade]
        //                          xadd read_count[1->2]
        //                          unlock access
        // Since T1 contended on the access lock before T3 did, it will steal
        // the lock handoff. Adding order_lock in the condvar reacquire path
        // solves this because T1 will hold order_lock while waiting on access,
        // which will cause T3 to have to wait until T1 finishes its write,
        // which can't happen until T2 finishes the downgrade-read entirely.
603 604
        // The astute reader will also note that making waking writers use the
        // order_lock is better for not starving readers.
605 606 607 608 609
        unsafe {
            do task::unkillable {
                (&self.order_lock).acquire();
                do (&self.access_lock).access_cond |cond| {
                    (&self.order_lock).release();
610 611 612 613
                    do task::rekillable {
                        let opt_lock = Just(&self.order_lock);
                        blk(&Condvar { order: opt_lock, ..*cond })
                    }
614 615 616
                }
            }
        }
617 618
    }

B
Ben Blum 已提交
619 620 621 622 623 624
    /**
     * As write(), but with the ability to atomically 'downgrade' the lock;
     * i.e., to become a reader without letting other writers get the lock in
     * the meantime (such as unlocking and then re-locking as a reader would
     * do). The block takes a "write mode token" argument, which can be
     * transformed into a "read mode token" by calling downgrade(). Example:
625 626 627 628
     *
     * # Example
     *
     * ~~~ {.rust}
629 630
     * do lock.write_downgrade |mut write_token| {
     *     do write_token.write_cond |condvar| {
631
     *         ... exclusive access ...
B
Ben Blum 已提交
632
     *     }
633
     *     let read_token = lock.downgrade(write_token);
634
     *     do read_token.read {
635 636 637 638
     *         ... shared access ...
     *     }
     * }
     * ~~~
B
Ben Blum 已提交
639
     */
640
    pub fn write_downgrade<U>(&self, blk: &fn(v: RWlockWriteMode) -> U) -> U {
B
Ben Blum 已提交
641 642
        // Implementation slightly different from the slicker 'write's above.
        // The exit path is conditional on whether the caller downgrades.
B
Brian Anderson 已提交
643
        let mut _release = None;
B
Ben Blum 已提交
644 645 646 647 648 649
        unsafe {
            do task::unkillable {
                (&self.order_lock).acquire();
                (&self.access_lock).acquire();
                (&self.order_lock).release();
            }
B
Ben Blum 已提交
650
            _release = Some(RWlockReleaseDowngrade(self));
B
Ben Blum 已提交
651
        }
B
Ben Blum 已提交
652
        blk(RWlockWriteMode { lock: self })
B
Ben Blum 已提交
653 654
    }

655
    /// To be called inside of the write_downgrade block.
656 657
    pub fn downgrade<'a>(&self, token: RWlockWriteMode<'a>)
                         -> RWlockReadMode<'a> {
658
        if !borrow::ref_eq(self, token.lock) {
659
            fail!("Can't downgrade() with a different rwlock's write_mode!");
B
Ben Blum 已提交
660 661 662
        }
        unsafe {
            do task::unkillable {
663 664 665 666 667 668 669 670 671
                let state = &mut *self.state.get();
                assert!(!state.read_mode);
                state.read_mode = true;
                // If a reader attempts to enter at this point, both the
                // downgrader and reader will set the mode flag. This is fine.
                let old_count = state.read_count.fetch_add(1, atomics::Release);
                // If another reader was already blocking, we need to hand-off
                // the "reader cloud" access lock to them.
                if old_count != 0 {
B
Ben Blum 已提交
672 673 674
                    // Guaranteed not to let another writer in, because
                    // another reader was holding the order_lock. Hence they
                    // must be the one to get the access_lock (because all
675 676
                    // access_locks are acquired with order_lock held). See
                    // the comment in write_cond for more justification.
B
Ben Blum 已提交
677 678 679 680
                    (&self.access_lock).release();
                }
            }
        }
B
Ben Blum 已提交
681
        RWlockReadMode { lock: token.lock }
B
Ben Blum 已提交
682
    }
683
}
684

T
Tim Chevalier 已提交
685
// FIXME(#3588) should go inside of read()
686
#[doc(hidden)]
687
struct RWlockReleaseRead<'self> {
688
    lock: &'self RWlock,
689 690
}

691
#[doc(hidden)]
692
#[unsafe_destructor]
693
impl<'self> Drop for RWlockReleaseRead<'self> {
L
Luqman Aden 已提交
694
    fn drop(&self) {
695 696
        unsafe {
            do task::unkillable {
697 698 699 700 701 702 703 704 705 706
                let state = &mut *self.lock.state.get();
                assert!(state.read_mode);
                let old_count = state.read_count.fetch_sub(1, atomics::Release);
                assert!(old_count > 0);
                if old_count == 1 {
                    state.read_mode = false;
                    // Note: this release used to be outside of a locked access
                    // to exclusive-protected state. If this code is ever
                    // converted back to such (instead of using atomic ops),
                    // this access MUST NOT go inside the exclusive access.
707
                    (&self.lock.access_lock).release();
B
Ben Blum 已提交
708
                }
709
            }
B
Ben Blum 已提交
710 711 712 713
        }
    }
}

714
#[doc(hidden)]
715
fn RWlockReleaseRead<'r>(lock: &'r RWlock) -> RWlockReleaseRead<'r> {
B
Brian Anderson 已提交
716 717 718 719 720
    RWlockReleaseRead {
        lock: lock
    }
}

T
Tim Chevalier 已提交
721
// FIXME(#3588) should go inside of downgrade()
722
#[doc(hidden)]
723
#[unsafe_destructor]
724
struct RWlockReleaseDowngrade<'self> {
725
    lock: &'self RWlock,
726 727
}

728
#[doc(hidden)]
729
#[unsafe_destructor]
730
impl<'self> Drop for RWlockReleaseDowngrade<'self> {
L
Luqman Aden 已提交
731
    fn drop(&self) {
732 733
        unsafe {
            do task::unkillable {
734 735 736 737 738 739 740 741 742 743
                let writer_or_last_reader;
                // Check if we're releasing from read mode or from write mode.
                let state = &mut *self.lock.state.get();
                if state.read_mode {
                    // Releasing from read mode.
                    let old_count = state.read_count.fetch_sub(1, atomics::Release);
                    assert!(old_count > 0);
                    // Check if other readers remain.
                    if old_count == 1 {
                        // Case 1: Writer downgraded & was the last reader
744
                        writer_or_last_reader = true;
745 746 747 748
                        state.read_mode = false;
                    } else {
                        // Case 2: Writer downgraded & was not the last reader
                        writer_or_last_reader = false;
B
Ben Blum 已提交
749
                    }
750 751 752
                } else {
                    // Case 3: Writer did not downgrade
                    writer_or_last_reader = true;
753
                }
754
                if writer_or_last_reader {
755
                    // Nobody left inside; release the "reader cloud" lock.
756 757
                    (&self.lock.access_lock).release();
                }
758
            }
B
Ben Blum 已提交
759 760 761 762
        }
    }
}

763
#[doc(hidden)]
764 765
fn RWlockReleaseDowngrade<'r>(lock: &'r RWlock)
                           -> RWlockReleaseDowngrade<'r> {
B
Brian Anderson 已提交
766 767 768 769 770
    RWlockReleaseDowngrade {
        lock: lock
    }
}

B
Ben Blum 已提交
771
/// The "write permission" token used for rwlock.write_downgrade().
772
pub struct RWlockWriteMode<'self> { priv lock: &'self RWlock }
773
#[unsafe_destructor]
L
Luqman Aden 已提交
774
impl<'self> Drop for RWlockWriteMode<'self> { fn drop(&self) {} }
775

B
Ben Blum 已提交
776
/// The "read permission" token used for rwlock.write_downgrade().
777
pub struct RWlockReadMode<'self> { priv lock: &'self RWlock }
778
#[unsafe_destructor]
L
Luqman Aden 已提交
779
impl<'self> Drop for RWlockReadMode<'self> { fn drop(&self) {} }
B
Ben Blum 已提交
780

781
impl<'self> RWlockWriteMode<'self> {
B
Ben Blum 已提交
782
    /// Access the pre-downgrade rwlock in write mode.
783
    pub fn write<U>(&self, blk: &fn() -> U) -> U { blk() }
B
Ben Blum 已提交
784
    /// Access the pre-downgrade rwlock in write mode with a condvar.
785
    pub fn write_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
786 787 788 789
        // Need to make the condvar use the order lock when reacquiring the
        // access lock. See comment in RWlock::write_cond for why.
        blk(&Condvar { sem:        &self.lock.access_lock,
                       order: Just(&self.lock.order_lock), })
B
Ben Blum 已提交
790 791
    }
}
792

793
impl<'self> RWlockReadMode<'self> {
B
Ben Blum 已提交
794
    /// Access the post-downgrade rwlock in read mode.
795
    pub fn read<U>(&self, blk: &fn() -> U) -> U { blk() }
B
Ben Blum 已提交
796 797
}

798 799 800
/****************************************************************************
 * Tests
 ****************************************************************************/
801 802 803

#[cfg(test)]
mod tests {
804
    use core::prelude::*;
805 806 807

    use sync::*;

P
Patrick Walton 已提交
808
    use core::cast;
809
    use core::cell::Cell;
810
    use core::comm;
811 812 813
    use core::result;
    use core::task;

B
Ben Blum 已提交
814 815 816
    /************************************************************************
     * Semaphore tests
     ************************************************************************/
817
    #[test]
818
    fn test_sem_acquire_release() {
B
Ben Blum 已提交
819
        let s = ~semaphore(1);
820 821 822 823 824
        s.acquire();
        s.release();
        s.acquire();
    }
    #[test]
825
    fn test_sem_basic() {
B
Ben Blum 已提交
826
        let s = ~semaphore(1);
827 828
        do s.access { }
    }
829
    #[test]
830
    fn test_sem_as_mutex() {
B
Ben Blum 已提交
831
        let s = ~semaphore(1);
832
        let s2 = ~s.clone();
L
Luqman Aden 已提交
833
        do task::spawn || {
834
            do s2.access {
835
                for 5.times { task::yield(); }
836 837 838
            }
        }
        do s.access {
839
            for 5.times { task::yield(); }
840 841 842
        }
    }
    #[test]
843
    fn test_sem_as_cvar() {
844
        /* Child waits and parent signals */
845
        let (p,c) = comm::stream();
B
Ben Blum 已提交
846
        let s = ~semaphore(0);
847
        let s2 = ~s.clone();
L
Luqman Aden 已提交
848
        do task::spawn || {
B
Ben Blum 已提交
849
            s2.acquire();
850 851
            c.send(());
        }
852
        for 5.times { task::yield(); }
B
Ben Blum 已提交
853
        s.release();
854
        let _ = p.recv();
B
Ben Blum 已提交
855

856
        /* Parent waits and child signals */
857
        let (p,c) = comm::stream();
B
Ben Blum 已提交
858
        let s = ~semaphore(0);
859
        let s2 = ~s.clone();
L
Luqman Aden 已提交
860
        do task::spawn || {
861
            for 5.times { task::yield(); }
B
Ben Blum 已提交
862
            s2.release();
863 864
            let _ = p.recv();
        }
B
Ben Blum 已提交
865
        s.acquire();
866 867 868
        c.send(());
    }
    #[test]
869
    fn test_sem_multi_resource() {
B
Ben Blum 已提交
870 871
        // Parent and child both get in the critical section at the same
        // time, and shake hands.
B
Ben Blum 已提交
872
        let s = ~semaphore(2);
B
Ben Blum 已提交
873
        let s2 = ~s.clone();
874 875
        let (p1,c1) = comm::stream();
        let (p2,c2) = comm::stream();
L
Luqman Aden 已提交
876
        do task::spawn || {
B
Ben Blum 已提交
877 878 879 880 881 882 883 884 885 886 887
            do s2.access {
                let _ = p2.recv();
                c1.send(());
            }
        }
        do s.access {
            c2.send(());
            let _ = p1.recv();
        }
    }
    #[test]
888
    fn test_sem_runtime_friendly_blocking() {
B
Ben Blum 已提交
889 890
        // Force the runtime to schedule two threads on the same sched_loop.
        // When one blocks, it should schedule the other one.
891
        do task::spawn_sched(task::ManualThreads(1)) {
B
Ben Blum 已提交
892
            let s = ~semaphore(1);
893
            let s2 = ~s.clone();
894
            let (p,c) = comm::stream();
895
            let child_data = Cell::new((s2, c));
896
            do s.access {
897
                let (s2, c) = child_data.take();
L
Luqman Aden 已提交
898
                do task::spawn || {
899 900 901 902 903 904 905 906 907 908
                    c.send(());
                    do s2.access { }
                    c.send(());
                }
                let _ = p.recv(); // wait for child to come alive
                for 5.times { task::yield(); } // let the child contend
            }
            let _ = p.recv(); // wait for child to be done
        }
    }
B
Ben Blum 已提交
909 910 911
    /************************************************************************
     * Mutex tests
     ************************************************************************/
912
    #[test]
913
    fn test_mutex_lock() {
914
        // Unsafely achieve shared state, and do the textbook
915
        // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance.
916
        let (p,c) = comm::stream();
917
        let m = ~Mutex();
B
Ben Striegel 已提交
918
        let m2 = m.clone();
919
        let mut sharedstate = ~0;
P
Patrick Walton 已提交
920 921 922 923 924 925 926
        {
            let ptr: *int = &*sharedstate;
            do task::spawn || {
                let sharedstate: &mut int =
                    unsafe { cast::transmute(ptr) };
                access_shared(sharedstate, m2, 10);
                c.send(());
B
Ben Blum 已提交
927

P
Patrick Walton 已提交
928
            }
929
        }
P
Patrick Walton 已提交
930 931 932
        {
            access_shared(sharedstate, m, 10);
            let _ = p.recv();
933

934
            assert_eq!(*sharedstate, 20);
P
Patrick Walton 已提交
935
        }
936

B
Ben Blum 已提交
937
        fn access_shared(sharedstate: &mut int, m: &Mutex, n: uint) {
938
            for n.times {
939
                do m.lock {
940 941 942 943 944 945 946 947
                    let oldval = *sharedstate;
                    task::yield();
                    *sharedstate = oldval + 1;
                }
            }
        }
    }
    #[test]
948
    fn test_mutex_cond_wait() {
949
        let m = ~Mutex();
950 951 952

        // Child wakes up parent
        do m.lock_cond |cond| {
953
            let m2 = ~m.clone();
L
Luqman Aden 已提交
954
            do task::spawn || {
955 956
                do m2.lock_cond |cond| {
                    let woken = cond.signal();
P
Patrick Walton 已提交
957
                    assert!(woken);
958
                }
959 960 961 962
            }
            cond.wait();
        }
        // Parent wakes up child
963
        let (port,chan) = comm::stream();
964
        let m3 = ~m.clone();
L
Luqman Aden 已提交
965
        do task::spawn || {
966 967 968 969 970 971 972 973
            do m3.lock_cond |cond| {
                chan.send(());
                cond.wait();
                chan.send(());
            }
        }
        let _ = port.recv(); // Wait until child gets in the mutex
        do m.lock_cond |cond| {
974
            let woken = cond.signal();
P
Patrick Walton 已提交
975
            assert!(woken);
976 977 978
        }
        let _ = port.recv(); // Wait until child wakes up
    }
979
    #[cfg(test)]
980
    fn test_mutex_cond_broadcast_helper(num_waiters: uint) {
981
        let m = ~Mutex();
982 983 984 985
        let mut ports = ~[];

        for num_waiters.times {
            let mi = ~m.clone();
986
            let (port, chan) = comm::stream();
L
Luqman Aden 已提交
987 988
            ports.push(port);
            do task::spawn || {
989 990 991 992 993 994 995 996 997
                do mi.lock_cond |cond| {
                    chan.send(());
                    cond.wait();
                    chan.send(());
                }
            }
        }

        // wait until all children get in the mutex
998
        for ports.iter().advance |port| { let _ = port.recv(); }
999 1000
        do m.lock_cond |cond| {
            let num_woken = cond.broadcast();
1001
            assert_eq!(num_woken, num_waiters);
1002 1003
        }
        // wait until all children wake up
1004
        for ports.iter().advance |port| { let _ = port.recv(); }
1005
    }
1006
    #[test]
1007
    fn test_mutex_cond_broadcast() {
1008 1009 1010
        test_mutex_cond_broadcast_helper(12);
    }
    #[test]
1011
    fn test_mutex_cond_broadcast_none() {
1012 1013 1014
        test_mutex_cond_broadcast_helper(0);
    }
    #[test]
1015
    fn test_mutex_cond_no_waiter() {
1016
        let m = ~Mutex();
1017
        let m2 = ~m.clone();
L
Luqman Aden 已提交
1018
        do task::try || {
1019 1020 1021
            do m.lock_cond |_x| { }
        };
        do m2.lock_cond |cond| {
P
Patrick Walton 已提交
1022
            assert!(!cond.signal());
1023 1024
        }
    }
1025
    #[test] #[ignore(cfg(windows))]
1026
    fn test_mutex_killed_simple() {
1027
        // Mutex must get automatically unlocked if failed/killed within.
1028
        let m = ~Mutex();
1029 1030
        let m2 = ~m.clone();

L
Luqman Aden 已提交
1031
        let result: result::Result<(),()> = do task::try || {
1032
            do m2.lock {
1033
                fail!();
1034 1035
            }
        };
P
Patrick Walton 已提交
1036
        assert!(result.is_err());
1037 1038 1039 1040
        // child task must have finished by the time try returns
        do m.lock { }
    }
    #[test] #[ignore(cfg(windows))]
1041
    fn test_mutex_killed_cond() {
1042 1043
        // Getting killed during cond wait must not corrupt the mutex while
        // unwinding (e.g. double unlock).
1044
        let m = ~Mutex();
1045 1046
        let m2 = ~m.clone();

L
Luqman Aden 已提交
1047
        let result: result::Result<(),()> = do task::try || {
1048
            let (p,c) = comm::stream();
L
Luqman Aden 已提交
1049
            do task::spawn || { // linked
1050 1051
                let _ = p.recv(); // wait for sibling to get in the mutex
                task::yield();
1052
                fail!();
1053 1054 1055 1056 1057 1058
            }
            do m2.lock_cond |cond| {
                c.send(()); // tell sibling go ahead
                cond.wait(); // block forever
            }
        };
P
Patrick Walton 已提交
1059
        assert!(result.is_err());
1060 1061
        // child task must have finished by the time try returns
        do m.lock_cond |cond| {
1062
            let woken = cond.signal();
P
Patrick Walton 已提交
1063
            assert!(!woken);
1064 1065 1066
        }
    }
    #[test] #[ignore(cfg(windows))]
1067
    fn test_mutex_killed_broadcast() {
1068
        let m = ~Mutex();
1069
        let m2 = ~m.clone();
1070
        let (p,c) = comm::stream();
1071

L
Luqman Aden 已提交
1072
        let result: result::Result<(),()> = do task::try || {
1073 1074
            let mut sibling_convos = ~[];
            for 2.times {
1075
                let (p,c) = comm::stream();
1076
                let c = Cell::new(c);
L
Luqman Aden 已提交
1077
                sibling_convos.push(p);
1078 1079
                let mi = ~m2.clone();
                // spawn sibling task
1080
                do task::spawn { // linked
1081
                    do mi.lock_cond |cond| {
1082
                        let c = c.take();
1083
                        c.send(()); // tell sibling to go ahead
L
Luqman Aden 已提交
1084
                        let _z = SendOnFailure(c);
1085 1086 1087 1088
                        cond.wait(); // block forever
                    }
                }
            }
1089
            for sibling_convos.iter().advance |p| {
1090 1091 1092
                let _ = p.recv(); // wait for sibling to get in the mutex
            }
            do m2.lock { }
L
Luqman Aden 已提交
1093
            c.send(sibling_convos); // let parent wait on all children
1094
            fail!();
1095
        };
P
Patrick Walton 已提交
1096
        assert!(result.is_err());
1097
        // child task must have finished by the time try returns
1098 1099
        let r = p.recv();
        for r.iter().advance |p| { p.recv(); } // wait on all its siblings
1100 1101
        do m.lock_cond |cond| {
            let woken = cond.broadcast();
1102
            assert_eq!(woken, 0);
1103
        }
B
Ben Blum 已提交
1104
        struct SendOnFailure {
1105
            c: comm::Chan<()>,
1106 1107
        }

1108
        impl Drop for SendOnFailure {
L
Luqman Aden 已提交
1109
            fn drop(&self) {
1110 1111
                self.c.send(());
            }
1112
        }
B
Brian Anderson 已提交
1113

1114
        fn SendOnFailure(c: comm::Chan<()>) -> SendOnFailure {
B
Brian Anderson 已提交
1115
            SendOnFailure {
L
Luqman Aden 已提交
1116
                c: c
B
Brian Anderson 已提交
1117 1118
            }
        }
1119
    }
1120
    #[test]
1121
    fn test_mutex_cond_signal_on_0() {
1122
        // Tests that signal_on(0) is equivalent to signal().
1123
        let m = ~Mutex();
1124 1125
        do m.lock_cond |cond| {
            let m2 = ~m.clone();
L
Luqman Aden 已提交
1126
            do task::spawn || {
1127 1128 1129 1130 1131 1132 1133 1134
                do m2.lock_cond |cond| {
                    cond.signal_on(0);
                }
            }
            cond.wait();
        }
    }
    #[test] #[ignore(cfg(windows))]
1135
    fn test_mutex_different_conds() {
1136 1137 1138
        let result = do task::try {
            let m = ~mutex_with_condvars(2);
            let m2 = ~m.clone();
1139
            let (p,c) = comm::stream();
L
Luqman Aden 已提交
1140
            do task::spawn || {
1141 1142 1143 1144 1145 1146 1147 1148
                do m2.lock_cond |cond| {
                    c.send(());
                    cond.wait_on(1);
                }
            }
            let _ = p.recv();
            do m.lock_cond |cond| {
                if !cond.signal_on(0) {
1149
                    fail!(); // success; punt sibling awake.
1150 1151 1152
                }
            }
        };
P
Patrick Walton 已提交
1153
        assert!(result.is_err());
1154 1155
    }
    #[test] #[ignore(cfg(windows))]
1156
    fn test_mutex_no_condvars() {
1157 1158 1159 1160
        let result = do task::try {
            let m = ~mutex_with_condvars(0);
            do m.lock_cond |cond| { cond.wait(); }
        };
P
Patrick Walton 已提交
1161
        assert!(result.is_err());
1162 1163 1164 1165
        let result = do task::try {
            let m = ~mutex_with_condvars(0);
            do m.lock_cond |cond| { cond.signal(); }
        };
P
Patrick Walton 已提交
1166
        assert!(result.is_err());
1167 1168 1169 1170
        let result = do task::try {
            let m = ~mutex_with_condvars(0);
            do m.lock_cond |cond| { cond.broadcast(); }
        };
P
Patrick Walton 已提交
1171
        assert!(result.is_err());
1172
    }
B
Ben Blum 已提交
1173 1174 1175 1176
    /************************************************************************
     * Reader/writer lock tests
     ************************************************************************/
    #[cfg(test)]
1177
    pub enum RWlockMode { Read, Write, Downgrade, DowngradeRead }
B
Ben Blum 已提交
1178
    #[cfg(test)]
1179
    fn lock_rwlock_in_mode(x: &RWlock, mode: RWlockMode, blk: &fn()) {
B
Ben Blum 已提交
1180
        match mode {
B
Ben Blum 已提交
1181 1182 1183
            Read => x.read(blk),
            Write => x.write(blk),
            Downgrade =>
1184
                do x.write_downgrade |mode| {
1185
                    (&mode).write(blk);
1186
                },
B
Ben Blum 已提交
1187
            DowngradeRead =>
1188
                do x.write_downgrade |mode| {
L
Luqman Aden 已提交
1189
                    let mode = x.downgrade(mode);
1190
                    (&mode).read(blk);
1191
                },
B
Ben Blum 已提交
1192
        }
B
Ben Blum 已提交
1193
    }
1194
    #[cfg(test)]
1195
    fn test_rwlock_exclusion(x: ~RWlock,
1196 1197
                                 mode1: RWlockMode,
                                 mode2: RWlockMode) {
1198 1199
        // Test mutual exclusion between readers and writers. Just like the
        // mutex mutual exclusion test, a ways above.
1200
        let (p,c) = comm::stream();
B
Ben Striegel 已提交
1201
        let x2 = (*x).clone();
1202
        let mut sharedstate = ~0;
P
Patrick Walton 已提交
1203 1204 1205 1206 1207 1208 1209 1210
        {
            let ptr: *int = &*sharedstate;
            do task::spawn || {
                let sharedstate: &mut int =
                    unsafe { cast::transmute(ptr) };
                access_shared(sharedstate, &x2, mode1, 10);
                c.send(());
            }
1211
        }
P
Patrick Walton 已提交
1212 1213 1214
        {
            access_shared(sharedstate, x, mode2, 10);
            let _ = p.recv();
1215

1216
            assert_eq!(*sharedstate, 20);
P
Patrick Walton 已提交
1217
        }
1218

B
Ben Blum 已提交
1219
        fn access_shared(sharedstate: &mut int, x: &RWlock, mode: RWlockMode,
1220 1221
                         n: uint) {
            for n.times {
B
Ben Blum 已提交
1222
                do lock_rwlock_in_mode(x, mode) {
1223 1224 1225 1226 1227 1228 1229 1230
                    let oldval = *sharedstate;
                    task::yield();
                    *sharedstate = oldval + 1;
                }
            }
        }
    }
    #[test]
1231
    fn test_rwlock_readers_wont_modify_the_data() {
1232 1233 1234 1235
        test_rwlock_exclusion(~RWlock(), Read, Write);
        test_rwlock_exclusion(~RWlock(), Write, Read);
        test_rwlock_exclusion(~RWlock(), Read, Downgrade);
        test_rwlock_exclusion(~RWlock(), Downgrade, Read);
1236 1237
    }
    #[test]
1238
    fn test_rwlock_writers_and_writers() {
1239 1240 1241 1242
        test_rwlock_exclusion(~RWlock(), Write, Write);
        test_rwlock_exclusion(~RWlock(), Write, Downgrade);
        test_rwlock_exclusion(~RWlock(), Downgrade, Write);
        test_rwlock_exclusion(~RWlock(), Downgrade, Downgrade);
1243
    }
B
Ben Blum 已提交
1244
    #[cfg(test)]
1245
    fn test_rwlock_handshake(x: ~RWlock,
1246 1247 1248
                                 mode1: RWlockMode,
                                 mode2: RWlockMode,
                                 make_mode2_go_first: bool) {
1249
        // Much like sem_multi_resource.
B
Ben Striegel 已提交
1250
        let x2 = (*x).clone();
1251 1252
        let (p1,c1) = comm::stream();
        let (p2,c2) = comm::stream();
L
Luqman Aden 已提交
1253
        do task::spawn || {
B
Ben Blum 已提交
1254 1255 1256
            if !make_mode2_go_first {
                let _ = p2.recv(); // parent sends to us once it locks, or ...
            }
B
Ben Striegel 已提交
1257
            do lock_rwlock_in_mode(&x2, mode2) {
B
Ben Blum 已提交
1258 1259 1260
                if make_mode2_go_first {
                    c1.send(()); // ... we send to it once we lock
                }
1261 1262 1263 1264
                let _ = p2.recv();
                c1.send(());
            }
        }
B
Ben Blum 已提交
1265 1266 1267 1268 1269 1270 1271
        if make_mode2_go_first {
            let _ = p1.recv(); // child sends to us once it locks, or ...
        }
        do lock_rwlock_in_mode(x, mode1) {
            if !make_mode2_go_first {
                c2.send(()); // ... we send to it once we lock
            }
1272 1273 1274
            c2.send(());
            let _ = p1.recv();
        }
B
Ben Blum 已提交
1275
    }
1276
    #[test]
1277
    fn test_rwlock_readers_and_readers() {
1278
        test_rwlock_handshake(~RWlock(), Read, Read, false);
B
Ben Blum 已提交
1279 1280
        // The downgrader needs to get in before the reader gets in, otherwise
        // they cannot end up reading at the same time.
1281 1282
        test_rwlock_handshake(~RWlock(), DowngradeRead, Read, false);
        test_rwlock_handshake(~RWlock(), Read, DowngradeRead, true);
B
Ben Blum 已提交
1283 1284 1285
        // Two downgrade_reads can never both end up reading at the same time.
    }
    #[test]
1286
    fn test_rwlock_downgrade_unlock() {
B
Ben Blum 已提交
1287
        // Tests that downgrade can unlock the lock in both modes
1288
        let x = ~RWlock();
B
Ben Blum 已提交
1289
        do lock_rwlock_in_mode(x, Downgrade) { }
L
Luqman Aden 已提交
1290
        test_rwlock_handshake(x, Read, Read, false);
1291
        let y = ~RWlock();
B
Ben Blum 已提交
1292
        do lock_rwlock_in_mode(y, DowngradeRead) { }
L
Luqman Aden 已提交
1293
        test_rwlock_exclusion(y, Write, Write);
B
Ben Blum 已提交
1294 1295
    }
    #[test]
1296
    fn test_rwlock_read_recursive() {
1297
        let x = ~RWlock();
B
Ben Blum 已提交
1298 1299 1300
        do x.read { do x.read { } }
    }
    #[test]
1301
    fn test_rwlock_cond_wait() {
1302
        // As test_mutex_cond_wait above.
1303
        let x = ~RWlock();
1304 1305 1306

        // Child wakes up parent
        do x.write_cond |cond| {
B
Ben Striegel 已提交
1307
            let x2 = (*x).clone();
L
Luqman Aden 已提交
1308
            do task::spawn || {
1309 1310
                do x2.write_cond |cond| {
                    let woken = cond.signal();
P
Patrick Walton 已提交
1311
                    assert!(woken);
1312 1313 1314 1315 1316
                }
            }
            cond.wait();
        }
        // Parent wakes up child
1317
        let (port,chan) = comm::stream();
B
Ben Striegel 已提交
1318
        let x3 = (*x).clone();
L
Luqman Aden 已提交
1319
        do task::spawn || {
1320 1321 1322 1323 1324 1325 1326 1327 1328 1329
            do x3.write_cond |cond| {
                chan.send(());
                cond.wait();
                chan.send(());
            }
        }
        let _ = port.recv(); // Wait until child gets in the rwlock
        do x.read { } // Must be able to get in as a reader in the meantime
        do x.write_cond |cond| { // Or as another writer
            let woken = cond.signal();
P
Patrick Walton 已提交
1330
            assert!(woken);
1331 1332 1333 1334
        }
        let _ = port.recv(); // Wait until child wakes up
        do x.read { } // Just for good measure
    }
B
Ben Blum 已提交
1335
    #[cfg(test)]
1336
    fn test_rwlock_cond_broadcast_helper(num_waiters: uint,
1337 1338
                                             dg1: bool,
                                             dg2: bool) {
B
Ben Blum 已提交
1339
        // Much like the mutex broadcast test. Downgrade-enabled.
1340
        fn lock_cond(x: &RWlock, downgrade: bool, blk: &fn(c: &Condvar)) {
B
Ben Blum 已提交
1341
            if downgrade {
1342
                do x.write_downgrade |mode| {
1343
                    (&mode).write_cond(blk)
1344
                }
B
Ben Blum 已提交
1345 1346 1347 1348
            } else {
                x.write_cond(blk)
            }
        }
1349
        let x = ~RWlock();
B
Ben Blum 已提交
1350 1351 1352
        let mut ports = ~[];

        for num_waiters.times {
B
Ben Striegel 已提交
1353
            let xi = (*x).clone();
1354
            let (port, chan) = comm::stream();
L
Luqman Aden 已提交
1355 1356
            ports.push(port);
            do task::spawn || {
B
Ben Striegel 已提交
1357
                do lock_cond(&xi, dg1) |cond| {
B
Ben Blum 已提交
1358 1359 1360 1361 1362 1363 1364 1365
                    chan.send(());
                    cond.wait();
                    chan.send(());
                }
            }
        }

        // wait until all children get in the mutex
1366
        for ports.iter().advance |port| { let _ = port.recv(); }
B
Ben Blum 已提交
1367 1368
        do lock_cond(x, dg2) |cond| {
            let num_woken = cond.broadcast();
1369
            assert_eq!(num_woken, num_waiters);
B
Ben Blum 已提交
1370 1371
        }
        // wait until all children wake up
1372
        for ports.iter().advance |port| { let _ = port.recv(); }
B
Ben Blum 已提交
1373 1374
    }
    #[test]
1375
    fn test_rwlock_cond_broadcast() {
B
Ben Blum 已提交
1376 1377 1378 1379 1380 1381 1382 1383 1384
        test_rwlock_cond_broadcast_helper(0, true, true);
        test_rwlock_cond_broadcast_helper(0, true, false);
        test_rwlock_cond_broadcast_helper(0, false, true);
        test_rwlock_cond_broadcast_helper(0, false, false);
        test_rwlock_cond_broadcast_helper(12, true, true);
        test_rwlock_cond_broadcast_helper(12, true, false);
        test_rwlock_cond_broadcast_helper(12, false, true);
        test_rwlock_cond_broadcast_helper(12, false, false);
    }
B
Ben Blum 已提交
1385
    #[cfg(test)] #[ignore(cfg(windows))]
1386
    fn rwlock_kill_helper(mode1: RWlockMode, mode2: RWlockMode) {
B
Ben Blum 已提交
1387
        // Mutex must get automatically unlocked if failed/killed within.
1388
        let x = ~RWlock();
B
Ben Striegel 已提交
1389
        let x2 = (*x).clone();
1390

L
Luqman Aden 已提交
1391
        let result: result::Result<(),()> = do task::try || {
B
Ben Striegel 已提交
1392
            do lock_rwlock_in_mode(&x2, mode1) {
1393
                fail!();
B
Ben Blum 已提交
1394 1395
            }
        };
P
Patrick Walton 已提交
1396
        assert!(result.is_err());
B
Ben Blum 已提交
1397
        // child task must have finished by the time try returns
B
Ben Blum 已提交
1398
        do lock_rwlock_in_mode(x, mode2) { }
1399
    }
B
Ben Blum 已提交
1400
    #[test] #[ignore(cfg(windows))]
1401
    fn test_rwlock_reader_killed_writer() {
1402 1403
        rwlock_kill_helper(Read, Write);
    }
B
Ben Blum 已提交
1404
    #[test] #[ignore(cfg(windows))]
1405
    fn test_rwlock_writer_killed_reader() {
1406 1407
        rwlock_kill_helper(Write,Read );
    }
B
Ben Blum 已提交
1408
    #[test] #[ignore(cfg(windows))]
1409
    fn test_rwlock_reader_killed_reader() {
1410 1411
        rwlock_kill_helper(Read, Read );
    }
B
Ben Blum 已提交
1412
    #[test] #[ignore(cfg(windows))]
1413
    fn test_rwlock_writer_killed_writer() {
1414 1415
        rwlock_kill_helper(Write,Write);
    }
B
Ben Blum 已提交
1416
    #[test] #[ignore(cfg(windows))]
1417
    fn test_rwlock_kill_downgrader() {
B
Ben Blum 已提交
1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429
        rwlock_kill_helper(Downgrade, Read);
        rwlock_kill_helper(Read, Downgrade);
        rwlock_kill_helper(Downgrade, Write);
        rwlock_kill_helper(Write, Downgrade);
        rwlock_kill_helper(DowngradeRead, Read);
        rwlock_kill_helper(Read, DowngradeRead);
        rwlock_kill_helper(DowngradeRead, Write);
        rwlock_kill_helper(Write, DowngradeRead);
        rwlock_kill_helper(DowngradeRead, Downgrade);
        rwlock_kill_helper(DowngradeRead, Downgrade);
        rwlock_kill_helper(Downgrade, DowngradeRead);
        rwlock_kill_helper(Downgrade, DowngradeRead);
B
Ben Blum 已提交
1430
    }
B
Ben Blum 已提交
1431
    #[test] #[should_fail] #[ignore(cfg(windows))]
1432
    fn test_rwlock_downgrade_cant_swap() {
B
Ben Blum 已提交
1433
        // Tests that you can't downgrade with a different rwlock's token.
1434 1435
        let x = ~RWlock();
        let y = ~RWlock();
B
Ben Blum 已提交
1436
        do x.write_downgrade |xwrite| {
L
Luqman Aden 已提交
1437
            let mut xopt = Some(xwrite);
B
Ben Blum 已提交
1438
            do y.write_downgrade |_ywrite| {
1439
                y.downgrade(xopt.swap_unwrap());
1440
                error!("oops, y.downgrade(x) should have failed!");
B
Ben Blum 已提交
1441 1442 1443
            }
        }
    }
1444
}