提交 08ce1788 编写于 作者: C Corey Richardson

rollup merge of #19274: alexcrichton/rewrite-sync

This commit is a reimplementation of `std::sync` to be based on the
system-provided primitives wherever possible. The previous implementation was
fundamentally built on top of channels, and as part of the runtime reform it has
become clear that this is not the level of abstraction that the standard level
should be providing. This rewrite aims to provide as thin of a shim as possible
on top of the system primitives in order to make them safe.

The overall interface of the `std::sync` module has in general not changed, but
there are a few important distinctions, highlighted below:

* The condition variable type, `Condvar`, has been separated out of a `Mutex`.
  A condition variable is now an entirely separate type. This separation
  benefits users who only use one mutex, and provides a clearer distinction of
  who's responsible for managing condition variables (the application).

* All of `Condvar`, `Mutex`, and `RWLock` are now directly built on top of
  system primitives rather than using a custom implementation. The `Once`,
  `Barrier`, and `Semaphore` types are still built upon these abstractions of
  the system primitives.

* The `Condvar`, `Mutex`, and `RWLock` types all have a new static type and
  constant initializer corresponding to them. These are provided primarily for C
  FFI interoperation, but are often useful to otherwise simply have a global
  lock. The types, however, will leak memory unless `destroy()` is called on
  them, which is clearly documented.

* The fundamental architecture of this design is to provide two separate layers.
  The first layer is that exposed by `sys_common` which is a cross-platform
  bare-metal abstraction of the system synchronization primitives. No attempt is
  made at making this layer safe, and it is quite unsafe to use! It is currently
  not exported as part of the API of the standard library, but the stabilization
  of the `sys` module will ensure that these will be exposed in time. The
  purpose of this layer is to provide the core cross-platform abstractions if
  necessary to implementors.

  The second layer is the layer provided by `std::sync` which is intended to be
  the thinnest possible layer on top of `sys_common` which is entirely safe to
  use. There are a few concerns which need to be addressed when making these
  system primitives safe:

    * Once used, the OS primitives can never be **moved**. This means that they
      essentially need to have a stable address. The static primitives use
      `&'static self` to enforce this, and the non-static primitives all use a
      `Box` to provide this guarantee.

    * Poisoning is leveraged to ensure that invalid data is not accessible from
      other tasks after one has panicked.

  In addition to these overall blanket safety limitations, each primitive has a
  few restrictions of its own:

    * Mutexes and rwlocks can only be unlocked from the same thread that they
      were locked by. This is achieved through RAII lock guards which cannot be
      sent across threads.

    * Mutexes and rwlocks can only be unlocked if they were previously locked.
      This is achieved by not exposing an unlocking method.

    * A condition variable can only be waited on with a locked mutex. This is
      achieved by requiring a `MutexGuard` in the `wait()` method.

    * A condition variable cannot be used concurrently with more than one mutex.
      This is guaranteed by dynamically binding a condition variable to
      precisely one mutex for its entire lifecycle. This restriction may be able
      to be relaxed in the future (a mutex is unbound when no threads are
      waiting on the condvar), but for now it is sufficient to guarantee safety.

* Condvars support timeouts for their blocking operations. The
  implementation for these operations is provided by the system.

Due to the modification of the `Condvar` API, removal of the `std::sync::mutex`
API, and reimplementation, this is a breaking change. Most code should be fairly
easy to port using the examples in the documentation of these primitives.

[breaking-change]

Closes #17094
Closes #18003
......@@ -38,9 +38,8 @@ exceptions = [
"rt/isaac/randport.cpp", # public domain
"rt/isaac/rand.h", # public domain
"rt/isaac/standard.h", # public domain
"libstd/sync/mpsc_queue.rs", # BSD
"libstd/sync/spsc_queue.rs", # BSD
"libstd/sync/mpmc_bounded_queue.rs", # BSD
"libstd/comm/mpsc_queue.rs", # BSD
"libstd/comm/spsc_queue.rs", # BSD
"test/bench/shootout-binarytrees.rs", # BSD
"test/bench/shootout-chameneos-redux.rs", # BSD
"test/bench/shootout-fannkuch-redux.rs", # BSD
......
......@@ -354,6 +354,8 @@ mod $name {
mod shared;
mod stream;
mod sync;
mod mpsc_queue;
mod spsc_queue;
/// The receiving-half of Rust's channel type. This half can only be owned by
/// one task
......@@ -628,24 +630,26 @@ pub fn send_opt(&self, t: T) -> Result<(), T> {
#[unstable]
impl<T: Send> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
let (packet, sleeper) = match *unsafe { self.inner() } {
let (packet, sleeper, guard) = match *unsafe { self.inner() } {
Oneshot(ref p) => {
let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
unsafe {
(*a.get()).postinit_lock();
let guard = (*a.get()).postinit_lock();
match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
oneshot::UpSuccess | oneshot::UpDisconnected => (a, None),
oneshot::UpWoke(task) => (a, Some(task))
oneshot::UpSuccess |
oneshot::UpDisconnected => (a, None, guard),
oneshot::UpWoke(task) => (a, Some(task), guard)
}
}
}
Stream(ref p) => {
let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
unsafe {
(*a.get()).postinit_lock();
let guard = (*a.get()).postinit_lock();
match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
stream::UpSuccess | stream::UpDisconnected => (a, None),
stream::UpWoke(task) => (a, Some(task)),
stream::UpSuccess |
stream::UpDisconnected => (a, None, guard),
stream::UpWoke(task) => (a, Some(task), guard),
}
}
}
......@@ -657,7 +661,7 @@ fn clone(&self) -> Sender<T> {
};
unsafe {
(*packet.get()).inherit_blocker(sleeper);
(*packet.get()).inherit_blocker(sleeper, guard);
let tmp = Sender::new(Shared(packet.clone()));
mem::swap(self.inner_mut(), tmp.inner_mut());
......
......@@ -132,15 +132,6 @@ pub fn pop(&self) -> PopResult<T> {
if self.head.load(Acquire) == tail {Empty} else {Inconsistent}
}
}
/// Attempts to pop data from this queue, but doesn't attempt too hard. This
/// will canonicalize inconsistent states to a `None` value.
pub fn casual_pop(&self) -> Option<T> {
match self.pop() {
Data(t) => Some(t),
Empty | Inconsistent => None,
}
}
}
#[unsafe_destructor]
......
......@@ -26,12 +26,11 @@
use core::cmp;
use core::int;
use rustrt::local::Local;
use rustrt::mutex::NativeMutex;
use rustrt::task::{Task, BlockedTask};
use rustrt::thread::Thread;
use sync::atomic;
use sync::mpsc_queue as mpsc;
use sync::{atomic, Mutex, MutexGuard};
use comm::mpsc_queue as mpsc;
const DISCONNECTED: int = int::MIN;
const FUDGE: int = 1024;
......@@ -56,7 +55,7 @@ pub struct Packet<T> {
// this lock protects various portions of this implementation during
// select()
select_lock: NativeMutex,
select_lock: Mutex<()>,
}
pub enum Failure {
......@@ -76,7 +75,7 @@ pub fn new() -> Packet<T> {
channels: atomic::AtomicInt::new(2),
port_dropped: atomic::AtomicBool::new(false),
sender_drain: atomic::AtomicInt::new(0),
select_lock: unsafe { NativeMutex::new() },
select_lock: Mutex::new(()),
};
return p;
}
......@@ -86,8 +85,8 @@ pub fn new() -> Packet<T> {
// In other case mutex data will be duplicated while cloning
// and that could cause problems on platforms where it is
// represented by opaque data structure
pub fn postinit_lock(&mut self) {
unsafe { self.select_lock.lock_noguard() }
pub fn postinit_lock(&self) -> MutexGuard<()> {
self.select_lock.lock()
}
// This function is used at the creation of a shared packet to inherit a
......@@ -95,7 +94,9 @@ pub fn postinit_lock(&mut self) {
// tasks in select().
//
// This can only be called at channel-creation time
pub fn inherit_blocker(&mut self, task: Option<BlockedTask>) {
pub fn inherit_blocker(&mut self,
task: Option<BlockedTask>,
guard: MutexGuard<()>) {
match task {
Some(task) => {
assert_eq!(self.cnt.load(atomic::SeqCst), 0);
......@@ -135,7 +136,7 @@ pub fn inherit_blocker(&mut self, task: Option<BlockedTask>) {
// interfere with this method. After we unlock this lock, we're
// signifying that we're done modifying self.cnt and self.to_wake and
// the port is ready for the world to continue using it.
unsafe { self.select_lock.unlock_noguard() }
drop(guard);
}
pub fn send(&mut self, t: T) -> Result<(), T> {
......@@ -441,7 +442,7 @@ pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool {
// done with. Without this bounce, we can race with inherit_blocker
// about looking at and dealing with to_wake. Once we have acquired the
// lock, we are guaranteed that inherit_blocker is done.
unsafe {
{
let _guard = self.select_lock.lock();
}
......
......@@ -40,7 +40,6 @@
use alloc::boxed::Box;
use core::mem;
use core::cell::UnsafeCell;
use alloc::arc::Arc;
use sync::atomic::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release};
......@@ -74,39 +73,6 @@ pub struct Queue<T> {
cache_subtractions: AtomicUint,
}
/// A safe abstraction for the consumer in a single-producer single-consumer
/// queue.
pub struct Consumer<T> {
inner: Arc<Queue<T>>
}
impl<T: Send> Consumer<T> {
/// Attempts to pop the value from the head of the queue, returning `None`
/// if the queue is empty.
pub fn pop(&mut self) -> Option<T> {
self.inner.pop()
}
/// Attempts to peek at the head of the queue, returning `None` if the queue
/// is empty.
pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> {
self.inner.peek()
}
}
/// A safe abstraction for the producer in a single-producer single-consumer
/// queue.
pub struct Producer<T> {
inner: Arc<Queue<T>>
}
impl<T: Send> Producer<T> {
/// Pushes a new value onto the queue.
pub fn push(&mut self, t: T) {
self.inner.push(t)
}
}
impl<T: Send> Node<T> {
fn new() -> *mut Node<T> {
unsafe {
......@@ -118,30 +84,6 @@ fn new() -> *mut Node<T> {
}
}
/// Creates a new queue with a consumer-producer pair.
///
/// The producer returned is connected to the consumer to push all data to
/// the consumer.
///
/// # Arguments
///
/// * `bound` - This queue implementation is implemented with a linked
/// list, and this means that a push is always a malloc. In
/// order to amortize this cost, an internal cache of nodes is
/// maintained to prevent a malloc from always being
/// necessary. This bound is the limit on the size of the
/// cache (if desired). If the value is 0, then the cache has
/// no bound. Otherwise, the cache will never grow larger than
/// `bound` (although the queue itself could be much larger.
pub fn queue<T: Send>(bound: uint) -> (Consumer<T>, Producer<T>) {
let q = unsafe { Queue::new(bound) };
let arc = Arc::new(q);
let consumer = Consumer { inner: arc.clone() };
let producer = Producer { inner: arc };
(consumer, producer)
}
impl<T: Send> Queue<T> {
/// Creates a new queue.
///
......@@ -296,78 +238,88 @@ fn drop(&mut self) {
mod test {
use prelude::*;
use super::{queue};
use sync::Arc;
use super::Queue;
#[test]
fn smoke() {
let (mut consumer, mut producer) = queue(0);
producer.push(1i);
producer.push(2);
assert_eq!(consumer.pop(), Some(1i));
assert_eq!(consumer.pop(), Some(2));
assert_eq!(consumer.pop(), None);
producer.push(3);
producer.push(4);
assert_eq!(consumer.pop(), Some(3));
assert_eq!(consumer.pop(), Some(4));
assert_eq!(consumer.pop(), None);
unsafe {
let queue = Queue::new(0);
queue.push(1i);
queue.push(2);
assert_eq!(queue.pop(), Some(1i));
assert_eq!(queue.pop(), Some(2));
assert_eq!(queue.pop(), None);
queue.push(3);
queue.push(4);
assert_eq!(queue.pop(), Some(3));
assert_eq!(queue.pop(), Some(4));
assert_eq!(queue.pop(), None);
}
}
#[test]
fn peek() {
let (mut consumer, mut producer) = queue(0);
producer.push(vec![1i]);
unsafe {
let queue = Queue::new(0);
queue.push(vec![1i]);
// Ensure the borrowchecker works
match queue.peek() {
Some(vec) => match vec.as_slice() {
// Note that `pop` is not allowed here due to borrow
[1] => {}
_ => return
},
None => unreachable!()
}
// Ensure the borrowchecker works
match consumer.peek() {
Some(vec) => match vec.as_slice() {
// Note that `pop` is not allowed here due to borrow
[1] => {}
_ => return
},
None => unreachable!()
queue.pop();
}
consumer.pop();
}
#[test]
fn drop_full() {
let (_, mut producer) = queue(0);
producer.push(box 1i);
producer.push(box 2i);
unsafe {
let q = Queue::new(0);
q.push(box 1i);
q.push(box 2i);
}
}
#[test]
fn smoke_bound() {
let (mut consumer, mut producer) = queue(1);
producer.push(1i);
producer.push(2);
assert_eq!(consumer.pop(), Some(1));
assert_eq!(consumer.pop(), Some(2));
assert_eq!(consumer.pop(), None);
producer.push(3);
producer.push(4);
assert_eq!(consumer.pop(), Some(3));
assert_eq!(consumer.pop(), Some(4));
assert_eq!(consumer.pop(), None);
unsafe {
let q = Queue::new(0);
q.push(1i);
q.push(2);
assert_eq!(q.pop(), Some(1));
assert_eq!(q.pop(), Some(2));
assert_eq!(q.pop(), None);
q.push(3);
q.push(4);
assert_eq!(q.pop(), Some(3));
assert_eq!(q.pop(), Some(4));
assert_eq!(q.pop(), None);
}
}
#[test]
fn stress() {
stress_bound(0);
stress_bound(1);
unsafe {
stress_bound(0);
stress_bound(1);
}
fn stress_bound(bound: uint) {
let (consumer, mut producer) = queue(bound);
unsafe fn stress_bound(bound: uint) {
let q = Arc::new(Queue::new(bound));
let (tx, rx) = channel();
let q2 = q.clone();
spawn(proc() {
// Move the consumer to a local mutable slot
let mut consumer = consumer;
for _ in range(0u, 100000) {
loop {
match consumer.pop() {
match q2.pop() {
Some(1i) => break,
Some(_) => panic!(),
None => {}
......@@ -377,7 +329,7 @@ fn stress_bound(bound: uint) {
tx.send(());
});
for _ in range(0i, 100000) {
producer.push(1);
q.push(1);
}
rx.recv();
}
......
......@@ -32,7 +32,7 @@
use rustrt::thread::Thread;
use sync::atomic;
use sync::spsc_queue as spsc;
use comm::spsc_queue as spsc;
use comm::Receiver;
const DISCONNECTED: int = int::MIN;
......
......@@ -225,8 +225,8 @@ pub unsafe fn open_internal() -> *mut u8 {
}
pub fn check_for_errors_in<T>(f: || -> T) -> Result<T, String> {
use rustrt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
use sync::{StaticMutex, MUTEX_INIT};
static LOCK: StaticMutex = MUTEX_INIT;
unsafe {
// dlerror isn't thread safe, so we need to lock around this entire
// sequence
......
......@@ -106,7 +106,7 @@
#![allow(unknown_features)]
#![feature(macro_rules, globs, linkage)]
#![feature(default_type_params, phase, lang_items, unsafe_destructor)]
#![feature(import_shadowing, slicing_syntax)]
#![feature(import_shadowing, slicing_syntax, tuple_indexing)]
// Don't link to std. We are std.
#![no_std]
......
......@@ -209,14 +209,12 @@ pub fn fill_utf16_buf_and_decode(f: |*mut u16, DWORD| -> DWORD)
Serialize access through a global lock.
*/
fn with_env_lock<T>(f: || -> T) -> T {
use rustrt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
use sync::{StaticMutex, MUTEX_INIT};
static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
static LOCK: StaticMutex = MUTEX_INIT;
unsafe {
let _guard = LOCK.lock();
f()
}
let _guard = LOCK.lock();
f()
}
/// Returns a vector of (variable, value) pairs, for all the environment
......
......@@ -238,7 +238,7 @@ mod imp {
use mem;
use option::{Some, None, Option};
use result::{Ok, Err};
use rustrt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
use sync::{StaticMutex, MUTEX_INIT};
/// As always - iOS on arm uses SjLj exceptions and
/// _Unwind_Backtrace is even not available there. Still,
......@@ -264,8 +264,8 @@ fn backtrace(buf: *mut *mut libc::c_void,
// while it doesn't requires lock for work as everything is
// local, it still displays much nicer backtraces when a
// couple of tasks panic simultaneously
static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
let _g = unsafe { LOCK.lock() };
static LOCK: StaticMutex = MUTEX_INIT;
let _g = LOCK.lock();
try!(writeln!(w, "stack backtrace:"));
// 100 lines should be enough
......@@ -297,8 +297,8 @@ struct Context<'a> {
// is semi-reasonable in terms of printing anyway, and we know that all
// I/O done here is blocking I/O, not green I/O, so we don't have to
// worry about this being a native vs green mutex.
static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
let _g = unsafe { LOCK.lock() };
static LOCK: StaticMutex = MUTEX_INIT;
let _g = LOCK.lock();
try!(writeln!(w, "stack backtrace:"));
......@@ -667,7 +667,7 @@ mod imp {
use option::{Some, None};
use path::Path;
use result::{Ok, Err};
use rustrt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
use sync::{StaticMutex, MUTEX_INIT};
use slice::SlicePrelude;
use str::StrPrelude;
use dynamic_lib::DynamicLibrary;
......@@ -928,8 +928,8 @@ impl Drop for Cleanup {
pub fn write(w: &mut Writer) -> IoResult<()> {
// According to windows documentation, all dbghelp functions are
// single-threaded.
static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
let _g = unsafe { LOCK.lock() };
static LOCK: StaticMutex = MUTEX_INIT;
let _g = LOCK.lock();
// Open up dbghelp.dll, we don't link to it explicitly because it can't
// always be found. Additionally, it's nice having fewer dependencies.
......
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use sync::{Mutex, Condvar};
/// A barrier enables multiple tasks to synchronize the beginning
/// of some computation.
///
/// ```rust
/// use std::sync::{Arc, Barrier};
///
/// let barrier = Arc::new(Barrier::new(10));
/// for _ in range(0u, 10) {
/// let c = barrier.clone();
/// // The same messages will be printed together.
/// // You will NOT see any interleaving.
/// spawn(proc() {
/// println!("before wait");
/// c.wait();
/// println!("after wait");
/// });
/// }
/// ```
pub struct Barrier {
lock: Mutex<BarrierState>,
cvar: Condvar,
num_threads: uint,
}
// The inner state of a double barrier
struct BarrierState {
count: uint,
generation_id: uint,
}
impl Barrier {
/// Create a new barrier that can block a given number of threads.
///
/// A barrier will block `n`-1 threads which call `wait` and then wake up
/// all threads at once when the `n`th thread calls `wait`.
pub fn new(n: uint) -> Barrier {
Barrier {
lock: Mutex::new(BarrierState {
count: 0,
generation_id: 0,
}),
cvar: Condvar::new(),
num_threads: n,
}
}
/// Block the current thread until all threads has rendezvoused here.
///
/// Barriers are re-usable after all threads have rendezvoused once, and can
/// be used continuously.
pub fn wait(&self) {
let mut lock = self.lock.lock();
let local_gen = lock.generation_id;
lock.count += 1;
if lock.count < self.num_threads {
// We need a while loop to guard against spurious wakeups.
// http://en.wikipedia.org/wiki/Spurious_wakeup
while local_gen == lock.generation_id &&
lock.count < self.num_threads {
self.cvar.wait(&lock);
}
} else {
lock.count = 0;
lock.generation_id += 1;
self.cvar.notify_all();
}
}
}
#[cfg(test)]
mod tests {
use prelude::*;
use sync::{Arc, Barrier};
use comm::Empty;
#[test]
fn test_barrier() {
let barrier = Arc::new(Barrier::new(10));
let (tx, rx) = channel();
for _ in range(0u, 9) {
let c = barrier.clone();
let tx = tx.clone();
spawn(proc() {
c.wait();
tx.send(true);
});
}
// At this point, all spawned tasks should be blocked,
// so we shouldn't get anything from the port
assert!(match rx.try_recv() {
Err(Empty) => true,
_ => false,
});
barrier.wait();
// Now, the barrier is cleared and we should get data.
for _ in range(0u, 9) {
rx.recv();
}
}
}
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use prelude::*;
use sync::atomic::{mod, AtomicUint};
use sync::{mutex, StaticMutexGuard};
use sys_common::condvar as sys;
use sys_common::mutex as sys_mutex;
use time::Duration;
/// A Condition Variable
///
/// Condition variables represent the ability to block a thread such that it
/// consumes no CPU time while waiting for an event to occur. Condition
/// variables are typically associated with a boolean predicate (a condition)
/// and a mutex. The predicate is always verified inside of the mutex before
/// determining that thread must block.
///
/// Functions in this module will block the current **thread** of execution and
/// are bindings to system-provided condition variables where possible. Note
/// that this module places one additional restriction over the system condition
/// variables: each condvar can be used with precisely one mutex at runtime. Any
/// attempt to use multiple mutexes on the same condition variable will result
/// in a runtime panic. If this is not desired, then the unsafe primitives in
/// `sys` do not have this restriction but may result in undefined behavior.
///
/// # Example
///
/// ```
/// use std::sync::{Arc, Mutex, Condvar};
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair2 = pair.clone();
///
/// // Inside of our lock, spawn a new thread, and then wait for it to start
/// spawn(proc() {
/// let &(ref lock, ref cvar) = &*pair2;
/// let mut started = lock.lock();
/// *started = true;
/// cvar.notify_one();
/// });
///
/// // wait for the thread to start up
/// let &(ref lock, ref cvar) = &*pair;
/// let started = lock.lock();
/// while !*started {
/// cvar.wait(&started);
/// }
/// ```
pub struct Condvar { inner: Box<StaticCondvar> }
/// Statically allocated condition variables.
///
/// This structure is identical to `Condvar` except that it is suitable for use
/// in static initializers for other structures.
///
/// # Example
///
/// ```
/// use std::sync::{StaticCondvar, CONDVAR_INIT};
///
/// static CVAR: StaticCondvar = CONDVAR_INIT;
/// ```
pub struct StaticCondvar {
inner: sys::Condvar,
mutex: AtomicUint,
}
/// Constant initializer for a statically allocated condition variable.
pub const CONDVAR_INIT: StaticCondvar = StaticCondvar {
inner: sys::CONDVAR_INIT,
mutex: atomic::INIT_ATOMIC_UINT,
};
/// A trait for vaules which can be passed to the waiting methods of condition
/// variables. This is implemented by the mutex guards in this module.
///
/// Note that this trait should likely not be implemented manually unless you
/// really know what you're doing.
pub trait AsMutexGuard {
#[allow(missing_docs)]
unsafe fn as_mutex_guard(&self) -> &StaticMutexGuard;
}
impl Condvar {
/// Creates a new condition variable which is ready to be waited on and
/// notified.
pub fn new() -> Condvar {
Condvar {
inner: box StaticCondvar {
inner: unsafe { sys::Condvar::new() },
mutex: AtomicUint::new(0),
}
}
}
/// Block the current thread until this condition variable receives a
/// notification.
///
/// This function will atomically unlock the mutex specified (represented by
/// `guard`) and block the current thread. This means that any calls to
/// `notify_*()` which happen logically after the mutex is unlocked are
/// candidates to wake this thread up. When this function call returns, the
/// lock specified will have been re-acquired.
///
/// Note that this function is susceptible to spurious wakeups. Condition
/// variables normally have a boolean predicate associated with them, and
/// the predicate must always be checked each time this function returns to
/// protect against spurious wakeups.
///
/// # Panics
///
/// This function will `panic!()` if it is used with more than one mutex
/// over time. Each condition variable is dynamically bound to exactly one
/// mutex to ensure defined behavior across platforms. If this functionality
/// is not desired, then unsafe primitives in `sys` are provided.
pub fn wait<T: AsMutexGuard>(&self, mutex_guard: &T) {
unsafe {
let me: &'static Condvar = &*(self as *const _);
me.inner.wait(mutex_guard)
}
}
/// Wait on this condition variable for a notification, timing out after a
/// specified duration.
///
/// The semantics of this function are equivalent to `wait()` except that
/// the thread will be blocked for roughly no longer than `dur`. This method
/// should not be used for precise timing due to anomalies such as
/// preemption or platform differences that may not cause the maximum amount
/// of time waited to be precisely `dur`.
///
/// If the wait timed out, then `false` will be returned. Otherwise if a
/// notification was received then `true` will be returned.
///
/// Like `wait`, the lock specified will be re-acquired when this function
/// returns, regardless of whether the timeout elapsed or not.
// Note that this method is *not* public, and this is quite intentional
// because we're not quite sure about the semantics of relative vs absolute
// durations or how the timing guarantees play into what the system APIs
// provide. There are also additional concerns about the unix-specific
// implementation which may need to be addressed.
#[allow(dead_code)]
fn wait_timeout<T: AsMutexGuard>(&self, mutex_guard: &T,
dur: Duration) -> bool {
unsafe {
let me: &'static Condvar = &*(self as *const _);
me.inner.wait_timeout(mutex_guard, dur)
}
}
/// Wake up one blocked thread on this condvar.
///
/// If there is a blocked thread on this condition variable, then it will
/// be woken up from its call to `wait` or `wait_timeout`. Calls to
/// `notify_one` are not buffered in any way.
///
/// To wake up all threads, see `notify_one()`.
pub fn notify_one(&self) { unsafe { self.inner.inner.notify_one() } }
/// Wake up all blocked threads on this condvar.
///
/// This method will ensure that any current waiters on the condition
/// variable are awoken. Calls to `notify_all()` are not buffered in any
/// way.
///
/// To wake up only one thread, see `notify_one()`.
pub fn notify_all(&self) { unsafe { self.inner.inner.notify_all() } }
}
impl Drop for Condvar {
fn drop(&mut self) {
unsafe { self.inner.inner.destroy() }
}
}
impl StaticCondvar {
/// Block the current thread until this condition variable receives a
/// notification.
///
/// See `Condvar::wait`.
pub fn wait<T: AsMutexGuard>(&'static self, mutex_guard: &T) {
unsafe {
let lock = mutex_guard.as_mutex_guard();
let sys = mutex::guard_lock(lock);
self.verify(sys);
self.inner.wait(sys);
(*mutex::guard_poison(lock)).check("mutex");
}
}
/// Wait on this condition variable for a notification, timing out after a
/// specified duration.
///
/// See `Condvar::wait_timeout`.
#[allow(dead_code)] // may want to stabilize this later, see wait_timeout above
fn wait_timeout<T: AsMutexGuard>(&'static self, mutex_guard: &T,
dur: Duration) -> bool {
unsafe {
let lock = mutex_guard.as_mutex_guard();
let sys = mutex::guard_lock(lock);
self.verify(sys);
let ret = self.inner.wait_timeout(sys, dur);
(*mutex::guard_poison(lock)).check("mutex");
return ret;
}
}
/// Wake up one blocked thread on this condvar.
///
/// See `Condvar::notify_one`.
pub fn notify_one(&'static self) { unsafe { self.inner.notify_one() } }
/// Wake up all blocked threads on this condvar.
///
/// See `Condvar::notify_all`.
pub fn notify_all(&'static self) { unsafe { self.inner.notify_all() } }
/// Deallocate all resources associated with this static condvar.
///
/// This method is unsafe to call as there is no guarantee that there are no
/// active users of the condvar, and this also doesn't prevent any future
/// users of the condvar. This method is required to be called to not leak
/// memory on all platforms.
pub unsafe fn destroy(&'static self) {
self.inner.destroy()
}
fn verify(&self, mutex: &sys_mutex::Mutex) {
let addr = mutex as *const _ as uint;
match self.mutex.compare_and_swap(0, addr, atomic::SeqCst) {
// If we got out 0, then we have successfully bound the mutex to
// this cvar.
0 => {}
// If we get out a value that's the same as `addr`, then someone
// already beat us to the punch.
n if n == addr => {}
// Anything else and we're using more than one mutex on this cvar,
// which is currently disallowed.
_ => panic!("attempted to use a condition variable with two \
mutexes"),
}
}
}
#[cfg(test)]
mod tests {
use prelude::*;
use time::Duration;
use super::{StaticCondvar, CONDVAR_INIT};
use sync::{StaticMutex, MUTEX_INIT, Condvar, Mutex, Arc};
#[test]
fn smoke() {
let c = Condvar::new();
c.notify_one();
c.notify_all();
}
#[test]
fn static_smoke() {
static C: StaticCondvar = CONDVAR_INIT;
C.notify_one();
C.notify_all();
unsafe { C.destroy(); }
}
#[test]
fn notify_one() {
static C: StaticCondvar = CONDVAR_INIT;
static M: StaticMutex = MUTEX_INIT;
let g = M.lock();
spawn(proc() {
let _g = M.lock();
C.notify_one();
});
C.wait(&g);
drop(g);
unsafe { C.destroy(); M.destroy(); }
}
#[test]
fn notify_all() {
const N: uint = 10;
let data = Arc::new((Mutex::new(0), Condvar::new()));
let (tx, rx) = channel();
for _ in range(0, N) {
let data = data.clone();
let tx = tx.clone();
spawn(proc() {
let &(ref lock, ref cond) = &*data;
let mut cnt = lock.lock();
*cnt += 1;
if *cnt == N {
tx.send(());
}
while *cnt != 0 {
cond.wait(&cnt);
}
tx.send(());
});
}
drop(tx);
let &(ref lock, ref cond) = &*data;
rx.recv();
let mut cnt = lock.lock();
*cnt = 0;
cond.notify_all();
drop(cnt);
for _ in range(0, N) {
rx.recv();
}
}
#[test]
fn wait_timeout() {
static C: StaticCondvar = CONDVAR_INIT;
static M: StaticMutex = MUTEX_INIT;
let g = M.lock();
assert!(!C.wait_timeout(&g, Duration::nanoseconds(1000)));
spawn(proc() {
let _g = M.lock();
C.notify_one();
});
assert!(C.wait_timeout(&g, Duration::days(1)));
drop(g);
unsafe { C.destroy(); M.destroy(); }
}
#[test]
#[should_fail]
fn two_mutexes() {
static M1: StaticMutex = MUTEX_INIT;
static M2: StaticMutex = MUTEX_INIT;
static C: StaticCondvar = CONDVAR_INIT;
let g = M1.lock();
spawn(proc() {
let _g = M1.lock();
C.notify_one();
});
C.wait(&g);
drop(g);
C.wait(&M2.lock());
}
}
此差异已折叠。
......@@ -148,7 +148,7 @@ mod test {
use prelude::*;
use sync::Future;
use task;
use comm::{channel, Sender};
use comm::channel;
#[test]
fn test_from_value() {
......
此差异已折叠。
......@@ -17,41 +17,27 @@
#![experimental]
pub use self::one::{Once, ONCE_INIT};
pub use alloc::arc::{Arc, Weak};
pub use self::lock::{Mutex, MutexGuard, Condvar, Barrier,
RWLock, RWLockReadGuard, RWLockWriteGuard};
// The mutex/rwlock in this module are not meant for reexport
pub use self::raw::{Semaphore, SemaphoreGuard};
pub use self::mutex::{Mutex, MutexGuard, StaticMutex, StaticMutexGuard, MUTEX_INIT};
pub use self::rwlock::{RWLock, StaticRWLock, RWLOCK_INIT};
pub use self::rwlock::{RWLockReadGuard, RWLockWriteGuard};
pub use self::rwlock::{StaticRWLockReadGuard, StaticRWLockWriteGuard};
pub use self::condvar::{Condvar, StaticCondvar, CONDVAR_INIT, AsMutexGuard};
pub use self::once::{Once, ONCE_INIT};
pub use self::semaphore::{Semaphore, SemaphoreGuard};
pub use self::barrier::Barrier;
pub use self::future::Future;
pub use self::task_pool::TaskPool;
// Core building blocks for all primitives in this crate
#[stable]
pub mod atomic;
// Concurrent data structures
pub mod spsc_queue;
pub mod mpsc_queue;
pub mod mpmc_bounded_queue;
pub mod deque;
// Low-level concurrency primitives
mod raw;
mod mutex;
mod one;
// Higher level primitives based on those above
mod lock;
// Task management
mod barrier;
mod condvar;
mod future;
mod mutex;
mod once;
mod poison;
mod rwlock;
mod semaphore;
mod task_pool;
/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
* SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
* OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* The views and conclusions contained in the software and documentation are
* those of the authors and should not be interpreted as representing official
* policies, either expressed or implied, of Dmitry Vyukov.
*/
#![experimental]
#![allow(missing_docs, dead_code)]
// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
use core::prelude::*;
use alloc::arc::Arc;
use vec::Vec;
use core::num::UnsignedInt;
use core::cell::UnsafeCell;
use sync::atomic::{AtomicUint,Relaxed,Release,Acquire};
struct Node<T> {
sequence: AtomicUint,
value: Option<T>,
}
struct State<T> {
pad0: [u8, ..64],
buffer: Vec<UnsafeCell<Node<T>>>,
mask: uint,
pad1: [u8, ..64],
enqueue_pos: AtomicUint,
pad2: [u8, ..64],
dequeue_pos: AtomicUint,
pad3: [u8, ..64],
}
pub struct Queue<T> {
state: Arc<State<T>>,
}
impl<T: Send> State<T> {
fn with_capacity(capacity: uint) -> State<T> {
let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 {
if capacity < 2 {
2u
} else {
// use next power of 2 as capacity
capacity.next_power_of_two()
}
} else {
capacity
};
let buffer = Vec::from_fn(capacity, |i| {
UnsafeCell::new(Node { sequence:AtomicUint::new(i), value: None })
});
State{
pad0: [0, ..64],
buffer: buffer,
mask: capacity-1,
pad1: [0, ..64],
enqueue_pos: AtomicUint::new(0),
pad2: [0, ..64],
dequeue_pos: AtomicUint::new(0),
pad3: [0, ..64],
}
}
fn push(&self, value: T) -> bool {
let mask = self.mask;
let mut pos = self.enqueue_pos.load(Relaxed);
loop {
let node = &self.buffer[pos & mask];
let seq = unsafe { (*node.get()).sequence.load(Acquire) };
let diff: int = seq as int - pos as int;
if diff == 0 {
let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed);
if enqueue_pos == pos {
unsafe {
(*node.get()).value = Some(value);
(*node.get()).sequence.store(pos+1, Release);
}
break
} else {
pos = enqueue_pos;
}
} else if diff < 0 {
return false
} else {
pos = self.enqueue_pos.load(Relaxed);
}
}
true
}
fn pop(&self) -> Option<T> {
let mask = self.mask;
let mut pos = self.dequeue_pos.load(Relaxed);
loop {
let node = &self.buffer[pos & mask];
let seq = unsafe { (*node.get()).sequence.load(Acquire) };
let diff: int = seq as int - (pos + 1) as int;
if diff == 0 {
let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed);
if dequeue_pos == pos {
unsafe {
let value = (*node.get()).value.take();
(*node.get()).sequence.store(pos + mask + 1, Release);
return value
}
} else {
pos = dequeue_pos;
}
} else if diff < 0 {
return None
} else {
pos = self.dequeue_pos.load(Relaxed);
}
}
}
}
impl<T: Send> Queue<T> {
pub fn with_capacity(capacity: uint) -> Queue<T> {
Queue{
state: Arc::new(State::with_capacity(capacity))
}
}
pub fn push(&self, value: T) -> bool {
self.state.push(value)
}
pub fn pop(&self) -> Option<T> {
self.state.pop()
}
}
impl<T: Send> Clone for Queue<T> {
fn clone(&self) -> Queue<T> {
Queue { state: self.state.clone() }
}
}
#[cfg(test)]
mod tests {
use prelude::*;
use super::Queue;
#[test]
fn test() {
let nthreads = 8u;
let nmsgs = 1000u;
let q = Queue::with_capacity(nthreads*nmsgs);
assert_eq!(None, q.pop());
let (tx, rx) = channel();
for _ in range(0, nthreads) {
let q = q.clone();
let tx = tx.clone();
spawn(proc() {
let q = q;
for i in range(0, nmsgs) {
assert!(q.push(i));
}
tx.send(());
});
}
let mut completion_rxs = vec![];
for _ in range(0, nthreads) {
let (tx, rx) = channel();
completion_rxs.push(rx);
let q = q.clone();
spawn(proc() {
let q = q;
let mut i = 0u;
loop {
match q.pop() {
None => {},
Some(_) => {
i += 1;
if i == nmsgs { break }
}
}
}
tx.send(i);
});
}
for rx in completion_rxs.iter_mut() {
assert_eq!(nmsgs, rx.recv());
}
for _ in range(0, nthreads) {
rx.recv();
}
}
}
......@@ -8,43 +8,68 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! A simple native mutex implementation. Warning: this API is likely
//! to change soon.
use prelude::*;
#![allow(dead_code)]
use core::prelude::*;
use alloc::boxed::Box;
use rustrt::mutex;
pub const LOCKED: uint = 1 << 0;
pub const BLOCKED: uint = 1 << 1;
use cell::UnsafeCell;
use kinds::marker;
use sync::{poison, AsMutexGuard};
use sys_common::mutex as sys;
/// A mutual exclusion primitive useful for protecting shared data
///
/// This mutex will properly block tasks waiting for the lock to become
/// available. The mutex can also be statically initialized or created via a
/// `new` constructor.
/// This mutex will block threads waiting for the lock to become available. The
/// mutex can also be statically initialized or created via a `new`
/// constructor. Each mutex has a type parameter which represents the data that
/// it is protecting. The data can only be accessed through the RAII guards
/// returned from `lock` and `try_lock`, which guarantees that the data is only
/// ever accessed when the mutex is locked.
///
/// # Poisoning
///
/// In order to prevent access to otherwise invalid data, each mutex will
/// propagate any panics which occur while the lock is held. Once a thread has
/// panicked while holding the lock, then all other threads will immediately
/// panic as well once they hold the lock.
///
/// # Example
///
/// ```rust,ignore
/// use std::sync::mutex::Mutex;
/// ```rust
/// use std::sync::{Arc, Mutex};
/// const N: uint = 10;
///
/// let m = Mutex::new();
/// let guard = m.lock();
/// // do some work
/// drop(guard); // unlock the lock
/// // Spawn a few threads to increment a shared variable (non-atomically), and
/// // let the main thread know once all increments are done.
/// //
/// // Here we're using an Arc to share memory among tasks, and the data inside
/// // the Arc is protected with a mutex.
/// let data = Arc::new(Mutex::new(0));
///
/// let (tx, rx) = channel();
/// for _ in range(0u, 10) {
/// let (data, tx) = (data.clone(), tx.clone());
/// spawn(proc() {
/// // The shared static can only be accessed once the lock is held.
/// // Our non-atomic increment is safe because we're the only thread
/// // which can access the shared state when the lock is held.
/// let mut data = data.lock();
/// *data += 1;
/// if *data == N {
/// tx.send(());
/// }
/// // the lock is unlocked here when `data` goes out of scope.
/// });
/// }
///
/// rx.recv();
/// ```
pub struct Mutex {
pub struct Mutex<T> {
// Note that this static mutex is in a *box*, not inlined into the struct
// itself. This is done for memory safety reasons with the usage of a
// StaticNativeMutex inside the static mutex above. Once a native mutex has
// been used once, its address can never change (it can't be moved). This
// mutex type can be safely moved at any time, so to ensure that the native
// mutex is used correctly we box the inner lock to give it a constant
// address.
lock: Box<StaticMutex>,
// itself. Once a native mutex has been used once, its address can never
// change (it can't be moved). This mutex type can be safely moved at any
// time, so to ensure that the native mutex is used correctly we box the
// inner lock to give it a constant address.
inner: Box<StaticMutex>,
data: UnsafeCell<T>,
}
/// The static mutex type is provided to allow for static allocation of mutexes.
......@@ -57,8 +82,8 @@ pub struct Mutex {
///
/// # Example
///
/// ```rust,ignore
/// use std::sync::mutex::{StaticMutex, MUTEX_INIT};
/// ```rust
/// use std::sync::{StaticMutex, MUTEX_INIT};
///
/// static LOCK: StaticMutex = MUTEX_INIT;
///
......@@ -69,35 +94,113 @@ pub struct Mutex {
/// // lock is unlocked here.
/// ```
pub struct StaticMutex {
lock: mutex::StaticNativeMutex,
lock: sys::Mutex,
poison: UnsafeCell<poison::Flag>,
}
/// An RAII implementation of a "scoped lock" of a mutex. When this structure is
/// dropped (falls out of scope), the lock will be unlocked.
///
/// The data protected by the mutex can be access through this guard via its
/// Deref and DerefMut implementations
#[must_use]
pub struct Guard<'a> {
guard: mutex::LockGuard<'a>,
pub struct MutexGuard<'a, T: 'a> {
// funny underscores due to how Deref/DerefMut currently work (they
// disregard field privacy).
__lock: &'a Mutex<T>,
__guard: StaticMutexGuard,
}
fn lift_guard(guard: mutex::LockGuard) -> Guard {
Guard { guard: guard }
/// An RAII implementation of a "scoped lock" of a static mutex. When this
/// structure is dropped (falls out of scope), the lock will be unlocked.
#[must_use]
pub struct StaticMutexGuard {
lock: &'static sys::Mutex,
marker: marker::NoSend,
poison: poison::Guard<'static>,
}
/// Static initialization of a mutex. This constant can be used to initialize
/// other mutex constants.
pub const MUTEX_INIT: StaticMutex = StaticMutex {
lock: mutex::NATIVE_MUTEX_INIT
lock: sys::MUTEX_INIT,
poison: UnsafeCell { value: poison::Flag { failed: false } },
};
impl StaticMutex {
/// Attempts to grab this lock, see `Mutex::try_lock`
pub fn try_lock<'a>(&'a self) -> Option<Guard<'a>> {
unsafe { self.lock.trylock().map(lift_guard) }
impl<T: Send> Mutex<T> {
/// Creates a new mutex in an unlocked state ready for use.
pub fn new(t: T) -> Mutex<T> {
Mutex {
inner: box MUTEX_INIT,
data: UnsafeCell::new(t),
}
}
/// Acquires a mutex, blocking the current task until it is able to do so.
///
/// This function will block the local task until it is available to acquire
/// the mutex. Upon returning, the task is the only task with the mutex
/// held. An RAII guard is returned to allow scoped unlock of the lock. When
/// the guard goes out of scope, the mutex will be unlocked.
///
/// # Panics
///
/// If another user of this mutex panicked while holding the mutex, then
/// this call will immediately panic once the mutex is acquired.
pub fn lock(&self) -> MutexGuard<T> {
unsafe {
let lock: &'static StaticMutex = &*(&*self.inner as *const _);
MutexGuard::new(self, lock.lock())
}
}
/// Attempts to acquire this lock.
///
/// If the lock could not be acquired at this time, then `None` is returned.
/// Otherwise, an RAII guard is returned. The lock will be unlocked when the
/// guard is dropped.
///
/// This function does not block.
///
/// # Panics
///
/// If another user of this mutex panicked while holding the mutex, then
/// this call will immediately panic if the mutex would otherwise be
/// acquired.
pub fn try_lock(&self) -> Option<MutexGuard<T>> {
unsafe {
let lock: &'static StaticMutex = &*(&*self.inner as *const _);
lock.try_lock().map(|guard| {
MutexGuard::new(self, guard)
})
}
}
}
#[unsafe_destructor]
impl<T: Send> Drop for Mutex<T> {
fn drop(&mut self) {
// This is actually safe b/c we know that there is no further usage of
// this mutex (it's up to the user to arrange for a mutex to get
// dropped, that's not our job)
unsafe { self.inner.lock.destroy() }
}
}
impl StaticMutex {
/// Acquires this lock, see `Mutex::lock`
pub fn lock<'a>(&'a self) -> Guard<'a> {
lift_guard(unsafe { self.lock.lock() })
pub fn lock(&'static self) -> StaticMutexGuard {
unsafe { self.lock.lock() }
StaticMutexGuard::new(self)
}
/// Attempts to grab this lock, see `Mutex::try_lock`
pub fn try_lock(&'static self) -> Option<StaticMutexGuard> {
if unsafe { self.lock.try_lock() } {
Some(StaticMutexGuard::new(self))
} else {
None
}
}
/// Deallocates resources associated with this static mutex.
......@@ -110,58 +213,73 @@ pub fn lock<'a>(&'a self) -> Guard<'a> {
/// *all* platforms. It may be the case that some platforms do not leak
/// memory if this method is not called, but this is not guaranteed to be
/// true on all platforms.
pub unsafe fn destroy(&self) {
pub unsafe fn destroy(&'static self) {
self.lock.destroy()
}
}
impl Mutex {
/// Creates a new mutex in an unlocked state ready for use.
pub fn new() -> Mutex {
Mutex {
lock: box StaticMutex {
lock: unsafe { mutex::StaticNativeMutex::new() },
}
}
impl<'mutex, T> MutexGuard<'mutex, T> {
fn new(lock: &Mutex<T>, guard: StaticMutexGuard) -> MutexGuard<T> {
MutexGuard { __lock: lock, __guard: guard }
}
}
/// Attempts to acquire this lock.
///
/// If the lock could not be acquired at this time, then `None` is returned.
/// Otherwise, an RAII guard is returned. The lock will be unlocked when the
/// guard is dropped.
///
/// This function does not block.
pub fn try_lock<'a>(&'a self) -> Option<Guard<'a>> {
self.lock.try_lock()
impl<'mutex, T> AsMutexGuard for MutexGuard<'mutex, T> {
unsafe fn as_mutex_guard(&self) -> &StaticMutexGuard { &self.__guard }
}
impl<'mutex, T> Deref<T> for MutexGuard<'mutex, T> {
fn deref<'a>(&'a self) -> &'a T { unsafe { &*self.__lock.data.get() } }
}
impl<'mutex, T> DerefMut<T> for MutexGuard<'mutex, T> {
fn deref_mut<'a>(&'a mut self) -> &'a mut T {
unsafe { &mut *self.__lock.data.get() }
}
}
/// Acquires a mutex, blocking the current task until it is able to do so.
///
/// This function will block the local task until it is available to acquire
/// the mutex. Upon returning, the task is the only task with the mutex
/// held. An RAII guard is returned to allow scoped unlock of the lock. When
/// the guard goes out of scope, the mutex will be unlocked.
pub fn lock<'a>(&'a self) -> Guard<'a> { self.lock.lock() }
impl StaticMutexGuard {
fn new(lock: &'static StaticMutex) -> StaticMutexGuard {
unsafe {
let guard = StaticMutexGuard {
lock: &lock.lock,
marker: marker::NoSend,
poison: (*lock.poison.get()).borrow(),
};
guard.poison.check("mutex");
return guard;
}
}
}
pub fn guard_lock(guard: &StaticMutexGuard) -> &sys::Mutex { guard.lock }
pub fn guard_poison(guard: &StaticMutexGuard) -> &poison::Guard {
&guard.poison
}
impl AsMutexGuard for StaticMutexGuard {
unsafe fn as_mutex_guard(&self) -> &StaticMutexGuard { self }
}
impl Drop for Mutex {
#[unsafe_destructor]
impl Drop for StaticMutexGuard {
fn drop(&mut self) {
// This is actually safe b/c we know that there is no further usage of
// this mutex (it's up to the user to arrange for a mutex to get
// dropped, that's not our job)
unsafe { self.lock.destroy() }
unsafe {
self.poison.done();
self.lock.unlock();
}
}
}
#[cfg(test)]
mod test {
use prelude::*;
use super::{Mutex, StaticMutex, MUTEX_INIT};
use task;
use sync::{Arc, Mutex, StaticMutex, MUTEX_INIT, Condvar};
#[test]
fn smoke() {
let m = Mutex::new();
let m = Mutex::new(());
drop(m.lock());
drop(m.lock());
}
......@@ -211,8 +329,104 @@ fn inc() {
}
#[test]
fn trylock() {
let m = Mutex::new();
fn try_lock() {
let m = Mutex::new(());
assert!(m.try_lock().is_some());
}
#[test]
fn test_mutex_arc_condvar() {
let arc = Arc::new((Mutex::new(false), Condvar::new()));
let arc2 = arc.clone();
let (tx, rx) = channel();
spawn(proc() {
// wait until parent gets in
rx.recv();
let &(ref lock, ref cvar) = &*arc2;
let mut lock = lock.lock();
*lock = true;
cvar.notify_one();
});
let &(ref lock, ref cvar) = &*arc;
let lock = lock.lock();
tx.send(());
assert!(!*lock);
while !*lock {
cvar.wait(&lock);
}
}
#[test]
#[should_fail]
fn test_arc_condvar_poison() {
let arc = Arc::new((Mutex::new(1i), Condvar::new()));
let arc2 = arc.clone();
let (tx, rx) = channel();
spawn(proc() {
rx.recv();
let &(ref lock, ref cvar) = &*arc2;
let _g = lock.lock();
cvar.notify_one();
// Parent should fail when it wakes up.
panic!();
});
let &(ref lock, ref cvar) = &*arc;
let lock = lock.lock();
tx.send(());
while *lock == 1 {
cvar.wait(&lock);
}
}
#[test]
#[should_fail]
fn test_mutex_arc_poison() {
let arc = Arc::new(Mutex::new(1i));
let arc2 = arc.clone();
let _ = task::try(proc() {
let lock = arc2.lock();
assert_eq!(*lock, 2);
});
let lock = arc.lock();
assert_eq!(*lock, 1);
}
#[test]
fn test_mutex_arc_nested() {
// Tests nested mutexes and access
// to underlying data.
let arc = Arc::new(Mutex::new(1i));
let arc2 = Arc::new(Mutex::new(arc));
let (tx, rx) = channel();
spawn(proc() {
let lock = arc2.lock();
let lock2 = lock.deref().lock();
assert_eq!(*lock2, 1);
tx.send(());
});
rx.recv();
}
#[test]
fn test_mutex_arc_access_in_unwind() {
let arc = Arc::new(Mutex::new(1i));
let arc2 = arc.clone();
let _ = task::try::<()>(proc() {
struct Unwinder {
i: Arc<Mutex<int>>,
}
impl Drop for Unwinder {
fn drop(&mut self) {
*self.i.lock() += 1;
}
}
let _u = Unwinder { i: arc2 };
panic!();
});
let lock = arc.lock();
assert_eq!(*lock, 2);
}
}
......@@ -13,12 +13,10 @@
//! This primitive is meant to be used to run one-time initialization. An
//! example use case would be for initializing an FFI library.
use core::prelude::*;
use core::int;
use core::atomic;
use super::mutex::{StaticMutex, MUTEX_INIT};
use int;
use mem::drop;
use sync::atomic;
use sync::{StaticMutex, MUTEX_INIT};
/// A synchronization primitive which can be used to run a one-time global
/// initialization. Useful for one-time initialization for FFI or related
......@@ -27,8 +25,8 @@
///
/// # Example
///
/// ```rust,ignore
/// use std::sync::one::{Once, ONCE_INIT};
/// ```rust
/// use std::sync::{Once, ONCE_INIT};
///
/// static START: Once = ONCE_INIT;
///
......@@ -59,7 +57,7 @@ impl Once {
///
/// When this function returns, it is guaranteed that some initialization
/// has run and completed (it may not be the closure specified).
pub fn doit(&self, f: ||) {
pub fn doit(&'static self, f: ||) {
// Optimize common path: load is much cheaper than fetch_add.
if self.cnt.load(atomic::SeqCst) < 0 {
return
......@@ -121,6 +119,7 @@ pub fn doit(&self, f: ||) {
#[cfg(test)]
mod test {
use prelude::*;
use task;
use super::{ONCE_INIT, Once};
......
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use option::None;
use rustrt::task::Task;
use rustrt::local::Local;
pub struct Flag { pub failed: bool }
impl Flag {
pub fn borrow(&mut self) -> Guard {
Guard { flag: &mut self.failed, failing: failing() }
}
}
pub struct Guard<'a> {
flag: &'a mut bool,
failing: bool,
}
impl<'a> Guard<'a> {
pub fn check(&self, name: &str) {
if *self.flag {
panic!("poisoned {} - another task failed inside", name);
}
}
pub fn done(&mut self) {
if !self.failing && failing() {
*self.flag = true;
}
}
}
fn failing() -> bool {
if Local::exists(None::<Task>) {
Local::borrow(None::<Task>).unwinder.unwinding()
} else {
false
}
}
此差异已折叠。
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use prelude::*;
use kinds::marker;
use cell::UnsafeCell;
use sys_common::rwlock as sys;
use sync::poison;
/// A reader-writer lock
///
/// This type of lock allows a number of readers or at most one writer at any
/// point in time. The write portion of this lock typically allows modification
/// of the underlying data (exclusive access) and the read portion of this lock
/// typically allows for read-only access (shared access).
///
/// The type parameter `T` represents the data that this lock protects. It is
/// required that `T` satisfies `Send` to be shared across tasks and `Sync` to
/// allow concurrent access through readers. The RAII guards returned from the
/// locking methods implement `Deref` (and `DerefMut` for the `write` methods)
/// to allow access to the contained of the lock.
///
/// RWLocks, like Mutexes, will become poisoned on panics. Note, however, that
/// an RWLock may only be poisoned if a panic occurs while it is locked
/// exclusively (write mode). If a panic occurs in any reader, then the lock
/// will not be poisoned.
///
/// # Example
///
/// ```
/// use std::sync::RWLock;
///
/// let lock = RWLock::new(5i);
///
/// // many reader locks can be held at once
/// {
/// let r1 = lock.read();
/// let r2 = lock.read();
/// assert_eq!(*r1, 5);
/// assert_eq!(*r2, 5);
/// } // read locks are dropped at this point
///
/// // only one write lock may be held, however
/// {
/// let mut w = lock.write();
/// *w += 1;
/// assert_eq!(*w, 6);
/// } // write lock is dropped here
/// ```
pub struct RWLock<T> {
inner: Box<StaticRWLock>,
data: UnsafeCell<T>,
}
/// Structure representing a staticaly allocated RWLock.
///
/// This structure is intended to be used inside of a `static` and will provide
/// automatic global access as well as lazy initialization. The internal
/// resources of this RWLock, however, must be manually deallocated.
///
/// # Example
///
/// ```
/// use std::sync::{StaticRWLock, RWLOCK_INIT};
///
/// static LOCK: StaticRWLock = RWLOCK_INIT;
///
/// {
/// let _g = LOCK.read();
/// // ... shared read access
/// }
/// {
/// let _g = LOCK.write();
/// // ... exclusive write access
/// }
/// unsafe { LOCK.destroy() } // free all resources
/// ```
pub struct StaticRWLock {
inner: sys::RWLock,
poison: UnsafeCell<poison::Flag>,
}
/// Constant initialization for a statically-initialized rwlock.
pub const RWLOCK_INIT: StaticRWLock = StaticRWLock {
inner: sys::RWLOCK_INIT,
poison: UnsafeCell { value: poison::Flag { failed: false } },
};
/// RAII structure used to release the shared read access of a lock when
/// dropped.
#[must_use]
pub struct RWLockReadGuard<'a, T: 'a> {
__lock: &'a RWLock<T>,
__guard: StaticRWLockReadGuard,
}
/// RAII structure used to release the exclusive write access of a lock when
/// dropped.
#[must_use]
pub struct RWLockWriteGuard<'a, T: 'a> {
__lock: &'a RWLock<T>,
__guard: StaticRWLockWriteGuard,
}
/// RAII structure used to release the shared read access of a lock when
/// dropped.
#[must_use]
pub struct StaticRWLockReadGuard {
lock: &'static sys::RWLock,
marker: marker::NoSend,
}
/// RAII structure used to release the exclusive write access of a lock when
/// dropped.
#[must_use]
pub struct StaticRWLockWriteGuard {
lock: &'static sys::RWLock,
marker: marker::NoSend,
poison: poison::Guard<'static>,
}
impl<T: Send + Sync> RWLock<T> {
/// Creates a new instance of an RWLock which is unlocked and read to go.
pub fn new(t: T) -> RWLock<T> {
RWLock { inner: box RWLOCK_INIT, data: UnsafeCell::new(t) }
}
/// Locks this rwlock with shared read access, blocking the current thread
/// until it can be acquired.
///
/// The calling thread will be blocked until there are no more writers which
/// hold the lock. There may be other readers currently inside the lock when
/// this method returns. This method does not provide any guarantees with
/// respect to the ordering of whether contentious readers or writers will
/// acquire the lock first.
///
/// Returns an RAII guard which will release this thread's shared access
/// once it is dropped.
///
/// # Panics
///
/// This function will panic if the RWLock is poisoned. An RWLock is
/// poisoned whenever a writer panics while holding an exclusive lock. The
/// panic will occur immediately after the lock has been acquired.
#[inline]
pub fn read(&self) -> RWLockReadGuard<T> {
unsafe {
let lock: &'static StaticRWLock = &*(&*self.inner as *const _);
RWLockReadGuard::new(self, lock.read())
}
}
/// Attempt to acquire this lock with shared read access.
///
/// This function will never block and will return immediately if `read`
/// would otherwise succeed. Returns `Some` of an RAII guard which will
/// release the shared access of this thread when dropped, or `None` if the
/// access could not be granted. This method does not provide any
/// guarantees with respect to the ordering of whether contentious readers
/// or writers will acquire the lock first.
///
/// # Panics
///
/// This function will panic if the RWLock is poisoned. An RWLock is
/// poisoned whenever a writer panics while holding an exclusive lock. A
/// panic will only occur if the lock is acquired.
#[inline]
pub fn try_read(&self) -> Option<RWLockReadGuard<T>> {
unsafe {
let lock: &'static StaticRWLock = &*(&*self.inner as *const _);
lock.try_read().map(|guard| {
RWLockReadGuard::new(self, guard)
})
}
}
/// Lock this rwlock with exclusive write access, blocking the current
/// thread until it can be acquired.
///
/// This function will not return while other writers or other readers
/// currently have access to the lock.
///
/// Returns an RAII guard which will drop the write access of this rwlock
/// when dropped.
///
/// # Panics
///
/// This function will panic if the RWLock is poisoned. An RWLock is
/// poisoned whenever a writer panics while holding an exclusive lock. The
/// panic will occur when the lock is acquired.
#[inline]
pub fn write(&self) -> RWLockWriteGuard<T> {
unsafe {
let lock: &'static StaticRWLock = &*(&*self.inner as *const _);
RWLockWriteGuard::new(self, lock.write())
}
}
/// Attempt to lock this rwlock with exclusive write access.
///
/// This function does not ever block, and it will return `None` if a call
/// to `write` would otherwise block. If successful, an RAII guard is
/// returned.
///
/// # Panics
///
/// This function will panic if the RWLock is poisoned. An RWLock is
/// poisoned whenever a writer panics while holding an exclusive lock. A
/// panic will only occur if the lock is acquired.
#[inline]
pub fn try_write(&self) -> Option<RWLockWriteGuard<T>> {
unsafe {
let lock: &'static StaticRWLock = &*(&*self.inner as *const _);
lock.try_write().map(|guard| {
RWLockWriteGuard::new(self, guard)
})
}
}
}
#[unsafe_destructor]
impl<T> Drop for RWLock<T> {
fn drop(&mut self) {
unsafe { self.inner.inner.destroy() }
}
}
impl StaticRWLock {
/// Locks this rwlock with shared read access, blocking the current thread
/// until it can be acquired.
///
/// See `RWLock::read`.
#[inline]
pub fn read(&'static self) -> StaticRWLockReadGuard {
unsafe { self.inner.read() }
StaticRWLockReadGuard::new(self)
}
/// Attempt to acquire this lock with shared read access.
///
/// See `RWLock::try_read`.
#[inline]
pub fn try_read(&'static self) -> Option<StaticRWLockReadGuard> {
if unsafe { self.inner.try_read() } {
Some(StaticRWLockReadGuard::new(self))
} else {
None
}
}
/// Lock this rwlock with exclusive write access, blocking the current
/// thread until it can be acquired.
///
/// See `RWLock::write`.
#[inline]
pub fn write(&'static self) -> StaticRWLockWriteGuard {
unsafe { self.inner.write() }
StaticRWLockWriteGuard::new(self)
}
/// Attempt to lock this rwlock with exclusive write access.
///
/// See `RWLock::try_write`.
#[inline]
pub fn try_write(&'static self) -> Option<StaticRWLockWriteGuard> {
if unsafe { self.inner.try_write() } {
Some(StaticRWLockWriteGuard::new(self))
} else {
None
}
}
/// Deallocate all resources associated with this static lock.
///
/// This method is unsafe to call as there is no guarantee that there are no
/// active users of the lock, and this also doesn't prevent any future users
/// of this lock. This method is required to be called to not leak memory on
/// all platforms.
pub unsafe fn destroy(&'static self) {
self.inner.destroy()
}
}
impl<'rwlock, T> RWLockReadGuard<'rwlock, T> {
fn new(lock: &RWLock<T>, guard: StaticRWLockReadGuard)
-> RWLockReadGuard<T> {
RWLockReadGuard { __lock: lock, __guard: guard }
}
}
impl<'rwlock, T> RWLockWriteGuard<'rwlock, T> {
fn new(lock: &RWLock<T>, guard: StaticRWLockWriteGuard)
-> RWLockWriteGuard<T> {
RWLockWriteGuard { __lock: lock, __guard: guard }
}
}
impl<'rwlock, T> Deref<T> for RWLockReadGuard<'rwlock, T> {
fn deref(&self) -> &T { unsafe { &*self.__lock.data.get() } }
}
impl<'rwlock, T> Deref<T> for RWLockWriteGuard<'rwlock, T> {
fn deref(&self) -> &T { unsafe { &*self.__lock.data.get() } }
}
impl<'rwlock, T> DerefMut<T> for RWLockWriteGuard<'rwlock, T> {
fn deref_mut(&mut self) -> &mut T { unsafe { &mut *self.__lock.data.get() } }
}
impl StaticRWLockReadGuard {
fn new(lock: &'static StaticRWLock) -> StaticRWLockReadGuard {
let guard = StaticRWLockReadGuard {
lock: &lock.inner,
marker: marker::NoSend,
};
unsafe { (*lock.poison.get()).borrow().check("rwlock"); }
return guard;
}
}
impl StaticRWLockWriteGuard {
fn new(lock: &'static StaticRWLock) -> StaticRWLockWriteGuard {
unsafe {
let guard = StaticRWLockWriteGuard {
lock: &lock.inner,
marker: marker::NoSend,
poison: (*lock.poison.get()).borrow(),
};
guard.poison.check("rwlock");
return guard;
}
}
}
#[unsafe_destructor]
impl Drop for StaticRWLockReadGuard {
fn drop(&mut self) {
unsafe { self.lock.read_unlock(); }
}
}
#[unsafe_destructor]
impl Drop for StaticRWLockWriteGuard {
fn drop(&mut self) {
self.poison.done();
unsafe { self.lock.write_unlock(); }
}
}
#[cfg(test)]
mod tests {
use prelude::*;
use rand::{mod, Rng};
use task;
use sync::{Arc, RWLock, StaticRWLock, RWLOCK_INIT};
#[test]
fn smoke() {
let l = RWLock::new(());
drop(l.read());
drop(l.write());
drop((l.read(), l.read()));
drop(l.write());
}
#[test]
fn static_smoke() {
static R: StaticRWLock = RWLOCK_INIT;
drop(R.read());
drop(R.write());
drop((R.read(), R.read()));
drop(R.write());
unsafe { R.destroy(); }
}
#[test]
fn frob() {
static R: StaticRWLock = RWLOCK_INIT;
static N: uint = 10;
static M: uint = 1000;
let (tx, rx) = channel::<()>();
for _ in range(0, N) {
let tx = tx.clone();
spawn(proc() {
let mut rng = rand::task_rng();
for _ in range(0, M) {
if rng.gen_weighted_bool(N) {
drop(R.write());
} else {
drop(R.read());
}
}
drop(tx);
});
}
drop(tx);
let _ = rx.recv_opt();
unsafe { R.destroy(); }
}
#[test]
#[should_fail]
fn test_rw_arc_poison_wr() {
let arc = Arc::new(RWLock::new(1i));
let arc2 = arc.clone();
let _ = task::try(proc() {
let lock = arc2.write();
assert_eq!(*lock, 2);
});
let lock = arc.read();
assert_eq!(*lock, 1);
}
#[test]
#[should_fail]
fn test_rw_arc_poison_ww() {
let arc = Arc::new(RWLock::new(1i));
let arc2 = arc.clone();
let _ = task::try(proc() {
let lock = arc2.write();
assert_eq!(*lock, 2);
});
let lock = arc.write();
assert_eq!(*lock, 1);
}
#[test]
fn test_rw_arc_no_poison_rr() {
let arc = Arc::new(RWLock::new(1i));
let arc2 = arc.clone();
let _ = task::try(proc() {
let lock = arc2.read();
assert_eq!(*lock, 2);
});
let lock = arc.read();
assert_eq!(*lock, 1);
}
#[test]
fn test_rw_arc_no_poison_rw() {
let arc = Arc::new(RWLock::new(1i));
let arc2 = arc.clone();
let _ = task::try(proc() {
let lock = arc2.read();
assert_eq!(*lock, 2);
});
let lock = arc.write();
assert_eq!(*lock, 1);
}
#[test]
fn test_rw_arc() {
let arc = Arc::new(RWLock::new(0i));
let arc2 = arc.clone();
let (tx, rx) = channel();
task::spawn(proc() {
let mut lock = arc2.write();
for _ in range(0u, 10) {
let tmp = *lock;
*lock = -1;
task::deschedule();
*lock = tmp + 1;
}
tx.send(());
});
// Readers try to catch the writer in the act
let mut children = Vec::new();
for _ in range(0u, 5) {
let arc3 = arc.clone();
children.push(task::try_future(proc() {
let lock = arc3.read();
assert!(*lock >= 0);
}));
}
// Wait for children to pass their asserts
for r in children.iter_mut() {
assert!(r.get_ref().is_ok());
}
// Wait for writer to finish
rx.recv();
let lock = arc.read();
assert_eq!(*lock, 10);
}
#[test]
fn test_rw_arc_access_in_unwind() {
let arc = Arc::new(RWLock::new(1i));
let arc2 = arc.clone();
let _ = task::try::<()>(proc() {
struct Unwinder {
i: Arc<RWLock<int>>,
}
impl Drop for Unwinder {
fn drop(&mut self) {
let mut lock = self.i.write();
*lock += 1;
}
}
let _u = Unwinder { i: arc2 };
panic!();
});
let lock = arc.read();
assert_eq!(*lock, 2);
}
}
此差异已折叠。
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use time::Duration;
use sys_common::mutex::{mod, Mutex};
use sys::condvar as imp;
/// An OS-based condition variable.
///
/// This structure is the lowest layer possible on top of the OS-provided
/// condition variables. It is consequently entirely unsafe to use. It is
/// recommended to use the safer types at the top level of this crate instead of
/// this type.
pub struct Condvar(imp::Condvar);
/// Static initializer for condition variables.
pub const CONDVAR_INIT: Condvar = Condvar(imp::CONDVAR_INIT);
impl Condvar {
/// Creates a new condition variable for use.
///
/// Behavior is undefined if the condition variable is moved after it is
/// first used with any of the functions below.
#[inline]
pub unsafe fn new() -> Condvar { Condvar(imp::Condvar::new()) }
/// Signal one waiter on this condition variable to wake up.
#[inline]
pub unsafe fn notify_one(&self) { self.0.notify_one() }
/// Awaken all current waiters on this condition variable.
#[inline]
pub unsafe fn notify_all(&self) { self.0.notify_all() }
/// Wait for a signal on the specified mutex.
///
/// Behavior is undefined if the mutex is not locked by the current thread.
/// Behavior is also undefined if more than one mutex is used concurrently
/// on this condition variable.
#[inline]
pub unsafe fn wait(&self, mutex: &Mutex) { self.0.wait(mutex::raw(mutex)) }
/// Wait for a signal on the specified mutex with a timeout duration
/// specified by `dur` (a relative time into the future).
///
/// Behavior is undefined if the mutex is not locked by the current thread.
/// Behavior is also undefined if more than one mutex is used concurrently
/// on this condition variable.
#[inline]
pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool {
self.0.wait_timeout(mutex::raw(mutex), dur)
}
/// Deallocate all resources associated with this condition variable.
///
/// Behavior is undefined if there are current or will be future users of
/// this condition variable.
#[inline]
pub unsafe fn destroy(&self) { self.0.destroy() }
}
......@@ -20,13 +20,14 @@
//! can be created in the future and there must be no active timers at that
//! time.
use prelude::*;
use cell::UnsafeCell;
use mem;
use rustrt::bookkeeping;
use rustrt::mutex::StaticNativeMutex;
use rustrt;
use cell::UnsafeCell;
use sync::{StaticMutex, StaticCondvar};
use sys::helper_signal;
use prelude::*;
use task;
......@@ -39,7 +40,8 @@
/// is for static initialization.
pub struct Helper<M> {
/// Internal lock which protects the remaining fields
pub lock: StaticNativeMutex,
pub lock: StaticMutex,
pub cond: StaticCondvar,
// You'll notice that the remaining fields are UnsafeCell<T>, and this is
// because all helper thread operations are done through &self, but we need
......@@ -53,6 +55,9 @@ pub struct Helper<M> {
/// Flag if this helper thread has booted and been initialized yet.
pub initialized: UnsafeCell<bool>,
/// Flag if this helper thread has shut down
pub shutdown: UnsafeCell<bool>,
}
impl<M: Send> Helper<M> {
......@@ -80,7 +85,9 @@ pub fn boot<T: Send>(&'static self,
task::spawn(proc() {
bookkeeping::decrement();
helper(receive, rx, t);
self.lock.lock().signal()
let _g = self.lock.lock();
*self.shutdown.get() = true;
self.cond.notify_one()
});
rustrt::at_exit(proc() { self.shutdown() });
......@@ -119,7 +126,9 @@ fn shutdown(&'static self) {
helper_signal::signal(*self.signal.get() as helper_signal::signal);
// Wait for the child to exit
guard.wait();
while !*self.shutdown.get() {
self.cond.wait(&guard);
}
drop(guard);
// Clean up after ourselves
......
......@@ -19,8 +19,11 @@
use path::BytesContainer;
use collections;
pub mod net;
pub mod condvar;
pub mod helper_thread;
pub mod mutex;
pub mod net;
pub mod rwlock;
pub mod thread_local;
// common error constructors
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册