提交 7d756e44 编写于 作者: A Alex Crichton

rustrt: Reorganize task usage

Most of the comments are available on the Task structure itself, but this commit
is aimed at making FFI-style usage of Rust tasks a little nicer.

Primarily, this commit enables re-use of tasks across multiple invocations. The
method `run` will no longer unconditionally destroy the task itself. Rather, the
task will be internally re-usable if the closure specified did not fail. Once a
task has failed once it is considered poisoned and it can never be used again.

Along the way I tried to document shortcomings of the current method of tearing
down a task, opening a few issues as well. For now none of the behavior is a
showstopper, but it's useful to acknowledge it. Also along the way I attempted
to remove as much `unsafe` code as possible, opting for safer abstractions.
上级 bab614f5
......@@ -299,7 +299,7 @@ pub fn start(argc: int, argv: **u8,
let mut ret = None;
simple::task().run(|| {
ret = Some(run(event_loop_factory, main.take_unwrap()));
});
}).destroy();
// unsafe is ok b/c we're sure that the runtime is gone
unsafe { rt::cleanup() }
ret.unwrap()
......
......@@ -110,7 +110,7 @@ pub enum Home {
// requested. This is the "try/catch" block for this green task and
// is the wrapper for *all* code run in the task.
let mut start = Some(start);
let task = task.swap().run(|| start.take_unwrap()());
let task = task.swap().run(|| start.take_unwrap()()).destroy();
// Once the function has exited, it's time to run the termination
// routine. This means we need to context switch one more time but
......@@ -120,7 +120,7 @@ pub enum Home {
// this we could add a `terminate` function to the `Runtime` trait
// in libstd, but that seems less appropriate since the coversion
// method exists.
GreenTask::convert(task).terminate()
GreenTask::convert(task).terminate();
}
impl GreenTask {
......
......@@ -134,13 +134,12 @@ pub fn start(argc: int, argv: **u8, main: proc()) -> int {
let mut main = Some(main);
let mut task = task::new((my_stack_bottom, my_stack_top));
task.name = Some(str::Slice("<main>"));
let t = task.run(|| {
drop(task.run(|| {
unsafe {
rt::stack::record_stack_bounds(my_stack_bottom, my_stack_top);
}
exit_code = Some(run(main.take_unwrap()));
});
drop(t);
}).destroy());
unsafe { rt::cleanup(); }
// If the exit code wasn't set, then the task block must have failed.
return exit_code.unwrap_or(rt::DEFAULT_ERROR_CODE);
......
......@@ -92,8 +92,7 @@ pub fn spawn_opts(opts: TaskOpts, f: proc():Send) {
let mut f = Some(f);
let mut task = task;
task.put_runtime(ops);
let t = task.run(|| { f.take_unwrap()() });
drop(t);
drop(task.run(|| { f.take_unwrap()() }).destroy());
bookkeeping::decrement();
})
}
......
......@@ -110,7 +110,12 @@ pub fn free(&mut self, alloc: *mut Box) {
self.memory_region.free(alloc);
}
pub unsafe fn annihilate(&mut self) {
/// Immortalize all pending allocations, forcing them to live forever.
///
/// This function will freeze all allocations to prevent all pending
/// allocations from being deallocated. This is used in preparation for when
/// a task is about to destroy TLD.
pub unsafe fn immortalize(&mut self) {
let mut n_total_boxes = 0u;
// Pass 1: Make all boxes immortal.
......@@ -122,6 +127,17 @@ pub unsafe fn annihilate(&mut self) {
(*alloc).ref_count = RC_IMMORTAL;
});
if debug_mem() {
// We do logging here w/o allocation.
rterrln!("total boxes annihilated: {}", n_total_boxes);
}
}
/// Continues deallocation of the all pending allocations in this arena.
///
/// This is invoked from the destructor, and requires that `immortalize` has
/// been called previously.
unsafe fn annihilate(&mut self) {
// Pass 2: Drop all boxes.
//
// In this pass, unique-managed boxes may get freed, but not
......@@ -142,11 +158,6 @@ pub unsafe fn annihilate(&mut self) {
self.each_live_alloc(true, |me, alloc| {
me.free(alloc);
});
if debug_mem() {
// We do logging here w/o allocation.
rterrln!("total boxes annihilated: {}", n_total_boxes);
}
}
unsafe fn each_live_alloc(&mut self, read_next_before: bool,
......@@ -170,6 +181,7 @@ unsafe fn each_live_alloc(&mut self, read_next_before: bool,
impl Drop for LocalHeap {
fn drop(&mut self) {
unsafe { self.annihilate() }
assert!(self.live_allocs.is_null());
}
}
......
......@@ -19,8 +19,8 @@
use alloc::owned::{AnyOwnExt, Box};
use core::any::Any;
use core::atomics::{AtomicUint, SeqCst};
use core::finally::Finally;
use core::iter::Take;
use core::kinds::marker;
use core::mem;
use core::raw;
......@@ -29,14 +29,71 @@
use local::Local;
use local_heap::LocalHeap;
use rtio::LocalIo;
use unwind;
use unwind::Unwinder;
use collections::str::SendStr;
/// The Task struct represents all state associated with a rust
/// task. There are at this point two primary "subtypes" of task,
/// however instead of using a subtype we just have a "task_type" field
/// in the struct. This contains a pointer to another struct that holds
/// the type-specific state.
/// State associated with Rust tasks.
///
/// Rust tasks are primarily built with two separate components. One is this
/// structure which handles standard services such as TLD, unwinding support,
/// naming of a task, etc. The second component is the runtime of this task, a
/// `Runtime` trait object.
///
/// The `Runtime` object instructs this task how it can perform critical
/// operations such as blocking, rescheduling, I/O constructors, etc. The two
/// halves are separately owned, but one is often found contained in the other.
/// A task's runtime can be reflected upon with the `maybe_take_runtime` method,
/// and otherwise its ownership is managed with `take_runtime` and
/// `put_runtime`.
///
/// In general, this structure should not be used. This is meant to be an
/// unstable internal detail of the runtime itself. From time-to-time, however,
/// it is useful to manage tasks directly. An example of this would be
/// interoperating with the Rust runtime from FFI callbacks or such. For this
/// reason, there are two methods of note with the `Task` structure.
///
/// * `run` - This function will execute a closure inside the context of a task.
/// Failure is caught and handled via the task's on_exit callback. If
/// this fails, the task is still returned, but it can no longer be
/// used, it is poisoned.
///
/// * `destroy` - This is a required function to call to destroy a task. If a
/// task falls out of scope without calling `destroy`, its
/// destructor bomb will go off, aborting the process.
///
/// With these two methods, tasks can be re-used to execute code inside of its
/// context while having a point in the future where destruction is allowed.
/// More information can be found on these specific methods.
///
/// # Example
///
/// ```no_run
/// extern crate native;
/// use std::uint;
/// # fn main() {
///
/// // Create a task using a native runtime
/// let task = native::task::new((0, uint::MAX));
///
/// // Run some code, catching any possible failures
/// let task = task.run(|| {
/// // Run some code inside this task
/// println!("Hello with a native runtime!");
/// });
///
/// // Run some code again, catching the failure
/// let task = task.run(|| {
/// fail!("oh no, what to do!");
/// });
///
/// // Now that the task is failed, it can never be used again
/// assert!(task.is_destroyed());
///
/// // Deallocate the resources associated with this task
/// task.destroy();
/// # }
/// ```
pub struct Task {
pub heap: LocalHeap,
pub gc: GarbageCollector,
......@@ -79,7 +136,8 @@ pub enum BlockedTask {
/// Per-task state related to task death, killing, failure, etc.
pub struct Death {
pub on_exit: Option<proc(Result): Send>,
pub on_exit: Option<proc(Result):Send>,
marker: marker::NoCopy,
}
pub struct BlockedTasks {
......@@ -87,6 +145,13 @@ pub struct BlockedTasks {
}
impl Task {
/// Creates a new uninitialized task.
///
/// This method cannot be used to immediately invoke `run` because the task
/// itself will likely require a runtime to be inserted via `put_runtime`.
///
/// Note that you likely don't want to call this function, but rather the
/// task creation functions through libnative or libgreen.
pub fn new() -> Task {
Task {
heap: LocalHeap::new(),
......@@ -100,80 +165,181 @@ pub fn new() -> Task {
}
}
/// Executes the given closure as if it's running inside this task. The task
/// is consumed upon entry, and the destroyed task is returned from this
/// function in order for the caller to free. This function is guaranteed to
/// not unwind because the closure specified is run inside of a `rust_try`
/// block. (this is the only try/catch block in the world).
/// Consumes ownership of a task, runs some code, and returns the task back.
///
/// This function is *not* meant to be abused as a "try/catch" block. This
/// is meant to be used at the absolute boundaries of a task's lifetime, and
/// only for that purpose.
pub fn run(~self, mut f: ||) -> Box<Task> {
// Need to put ourselves into TLS, but also need access to the unwinder.
// Unsafely get a handle to the task so we can continue to use it after
// putting it in tls (so we can invoke the unwinder).
let handle: *mut Task = unsafe {
*mem::transmute::<&Box<Task>, &*mut Task>(&self)
};
/// This function can be used as an emulated "try/catch" to interoperate
/// with the rust runtime at the outermost boundary. It is not possible to
/// use this function in a nested fashion (a try/catch inside of another
/// try/catch). Invoking this funciton is quite cheap.
///
/// If the closure `f` succeeds, then the returned task can be used again
/// for another invocation of `run`. If the closure `f` fails then `self`
/// will be internally destroyed along with all of the other associated
/// resources of this task. The `on_exit` callback is invoked with the
/// cause of failure (not returned here). This can be discovered by querying
/// `is_destroyed()`.
///
/// Note that it is possible to view partial execution of the closure `f`
/// because it is not guaranteed to run to completion, but this function is
/// guaranteed to return if it fails. Care should be taken to ensure that
/// stack references made by `f` are handled appropriately.
///
/// It is invalid to call this function with a task that has been previously
/// destroyed via a failed call to `run`.
///
/// # Example
///
/// ```no_run
/// extern crate native;
/// use std::uint;
/// # fn main() {
///
/// // Create a new native task
/// let task = native::task::new((0, uint::MAX));
///
/// // Run some code once and then destroy this task
/// task.run(|| {
/// println!("Hello with a native runtime!");
/// }).destroy();
/// # }
/// ```
pub fn run(~self, f: ||) -> Box<Task> {
assert!(!self.is_destroyed(), "cannot re-use a destroyed task");
// First, make sure that no one else is in TLS. This does not allow
// recursive invocations of run(). If there's no one else, then
// relinquish ownership of ourselves back into TLS.
if Local::exists(None::<Task>) {
fail!("cannot run a task recursively inside another");
}
Local::put(self);
// The only try/catch block in the world. Attempt to run the task's
// client-specified code and catch any failures.
let try_block = || {
// Run the task main function, then do some cleanup.
f.finally(|| {
// First, destroy task-local storage. This may run user dtors.
//
// FIXME #8302: Dear diary. I'm so tired and confused.
// There's some interaction in rustc between the box
// annihilator and the TLS dtor by which TLS is
// accessed from annihilated box dtors *after* TLS is
// destroyed. Somehow setting TLS back to null, as the
// old runtime did, makes this work, but I don't currently
// understand how. I would expect that, if the annihilator
// reinvokes TLS while TLS is uninitialized, that
// TLS would be reinitialized but never destroyed,
// but somehow this works. I have no idea what's going
// on but this seems to make things magically work. FML.
//
// (added after initial comment) A possible interaction here is
// that the destructors for the objects in TLS themselves invoke
// TLS, or possibly some destructors for those objects being
// annihilated invoke TLS. Sadly these two operations seemed to
// be intertwined, and miraculously work for now...
drop({
let mut task = Local::borrow(None::<Task>);
let &LocalStorage(ref mut optmap) = &mut task.storage;
optmap.take()
});
// Destroy remaining boxes. Also may run user dtors.
let mut heap = {
let mut task = Local::borrow(None::<Task>);
mem::replace(&mut task.heap, LocalHeap::new())
};
unsafe { heap.annihilate() }
drop(heap);
})
};
// There are two primary reasons that general try/catch is unsafe. The
// first is that we do not support nested try/catch. The above check for
// an existing task in TLS is sufficient for this invariant to be
// upheld. The second is that unwinding while unwinding is not defined.
// We take care of that by having an 'unwinding' flag in the task
// itself. For these reasons, this unsafety should be ok.
let result = unsafe { unwind::try(f) };
// After running the closure given return the task back out if it ran
// successfully, or clean up the task if it failed.
let task: Box<Task> = Local::take();
match result {
Ok(()) => task,
Err(cause) => { task.cleanup(Err(cause)) }
}
}
unsafe { (*handle).unwinder.try(try_block); }
/// Destroy all associated resources of this task.
///
/// This function will perform any necessary clean up to prepare the task
/// for destruction. It is required that this is called before a `Task`
/// falls out of scope.
///
/// The returned task cannot be used for running any more code, but it may
/// be used to extract the runtime as necessary.
pub fn destroy(~self) -> Box<Task> {
if self.is_destroyed() {
self
} else {
self.cleanup(Ok(()))
}
}
// Here we must unsafely borrow the task in order to not remove it from
// TLS. When collecting failure, we may attempt to send on a channel (or
// just run arbitrary code), so we must be sure to still have a local
// task in TLS.
unsafe {
let me: *mut Task = Local::unsafe_borrow();
(*me).death.collect_failure((*me).unwinder.result());
/// Cleans up a task, processing the result of the task as appropriate.
///
/// This function consumes ownership of the task, deallocating it once it's
/// done being processed. It is assumed that TLD and the local heap have
/// already been destroyed and/or annihilated.
fn cleanup(~self, result: Result) -> Box<Task> {
// The first thing to do when cleaning up is to deallocate our local
// resources, such as TLD and GC data.
//
// FIXME: there are a number of problems with this code
//
// 1. If any TLD object fails destruction, then all of TLD will leak.
// This appears to be a consequence of #14875.
//
// 2. Failing during GC annihilation aborts the runtime #14876.
//
// 3. Setting a TLD key while destroying TLD or while destroying GC will
// abort the runtime #14807.
//
// 4. Invoking GC in GC destructors will abort the runtime #6996.
//
// 5. The order of destruction of TLD and GC matters, but either way is
// susceptible to leaks (see 3/4) #8302.
//
// That being said, there are a few upshots to this code
//
// 1. If TLD destruction fails, heap destruction will be attempted.
// There is a test for this at fail-during-tld-destroy.rs. Sadly the
// other way can't be tested due to point 2 above. Note that we must
// immortalize the heap first becuase if any deallocations are
// attempted while TLD is being dropped it will attempt to free the
// allocation from the wrong heap (because the current one has been
// replaced).
//
// 2. One failure in destruction is tolerable, so long as the task
// didn't originally fail while it was running.
//
// And with all that in mind, we attempt to clean things up!
let mut task = self.run(|| {
let mut task = Local::borrow(None::<Task>);
let tld = {
let &LocalStorage(ref mut optmap) = &mut task.storage;
optmap.take()
};
let mut heap = mem::replace(&mut task.heap, LocalHeap::new());
unsafe { heap.immortalize() }
drop(task);
// First, destroy task-local storage. This may run user dtors.
drop(tld);
// Destroy remaining boxes. Also may run user dtors.
drop(heap);
});
// If the above `run` block failed, then it must be the case that the
// task had previously succeeded. This also means that the code below
// was recursively run via the `run` method invoking this method. In
// this case, we just make sure the world is as we thought, and return.
if task.is_destroyed() {
rtassert!(result.is_ok())
return task
}
// After taking care of the data above, we need to transmit the result
// of this task.
let what_to_do = task.death.on_exit.take();
Local::put(task);
// FIXME: this is running in a seriously constrained context. If this
// allocates GC or allocates TLD then it will likely abort the
// runtime. Similarly, if this fails, this will also likely abort
// the runtime.
//
// This closure is currently limited to a channel send via the
// standard library's task interface, but this needs
// reconsideration to whether it's a reasonable thing to let a
// task to do or not.
match what_to_do {
Some(f) => { f(result) }
None => { drop(result) }
}
let mut me: Box<Task> = Local::take();
me.destroyed = true;
return me;
// Now that we're done, we remove the task from TLS and flag it for
// destruction.
let mut task: Box<Task> = Local::take();
task.destroyed = true;
return task;
}
/// Queries whether this can be destroyed or not.
pub fn is_destroyed(&self) -> bool { self.destroyed }
/// Inserts a runtime object into this task, transferring ownership to the
/// task. It is illegal to replace a previous runtime object in this task
/// with this argument.
......@@ -182,6 +348,13 @@ pub fn put_runtime(&mut self, ops: Box<Runtime + Send>) {
self.imp = Some(ops);
}
/// Removes the runtime from this task, transferring ownership to the
/// caller.
pub fn take_runtime(&mut self) -> Box<Runtime + Send> {
assert!(self.imp.is_some());
self.imp.take().unwrap()
}
/// Attempts to extract the runtime as a specific type. If the runtime does
/// not have the provided type, then the runtime is not removed. If the
/// runtime does have the specified type, then it is removed and returned
......@@ -374,21 +547,7 @@ pub unsafe fn cast_from_uint(blocked_task_ptr: uint) -> BlockedTask {
impl Death {
pub fn new() -> Death {
Death { on_exit: None, }
}
/// Collect failure exit codes from children and propagate them to a parent.
pub fn collect_failure(&mut self, result: Result) {
match self.on_exit.take() {
Some(f) => f(result),
None => {}
}
}
}
impl Drop for Death {
fn drop(&mut self) {
// make this type noncopyable
Death { on_exit: None, marker: marker::NoCopy }
}
}
......
......@@ -78,7 +78,6 @@
pub struct Unwinder {
unwinding: bool,
cause: Option<Box<Any + Send>>
}
struct Exception {
......@@ -107,25 +106,12 @@ impl Unwinder {
pub fn new() -> Unwinder {
Unwinder {
unwinding: false,
cause: None,
}
}
pub fn unwinding(&self) -> bool {
self.unwinding
}
pub fn try(&mut self, f: ||) {
self.cause = unsafe { try(f) }.err();
}
pub fn result(&mut self) -> Result {
if self.unwinding {
Err(self.cause.take().unwrap())
} else {
Ok(())
}
}
}
/// Invoke a closure, capturing the cause of failure if one occurs.
......
......@@ -295,8 +295,8 @@ pub fn try_future<T:Send>(self, f: proc():Send -> T)
let (tx_done, rx_done) = channel(); // signal that task has exited
let (tx_retv, rx_retv) = channel(); // return value from task
let on_exit = proc(res) { tx_done.send(res) };
self.spawn_internal(proc() { tx_retv.send(f()) },
let on_exit = proc(res) { let _ = tx_done.send_opt(res); };
self.spawn_internal(proc() { let _ = tx_retv.send_opt(f()); },
Some(on_exit));
Future::from_fn(proc() {
......@@ -641,3 +641,14 @@ fn test_stdout() {
// NOTE: the corresponding test for stderr is in run-pass/task-stderr, due
// to the test harness apparently interfering with stderr configuration.
}
#[test]
fn task_abort_no_kill_runtime() {
use std::io::timer;
use mem;
let mut tb = TaskBuilder::new();
let rx = tb.try_future(proc() {});
mem::drop(rx);
timer::sleep(1000);
}
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use std::task;
use std::gc::{GC, Gc};
use std::cell::RefCell;
static mut DROPS: uint = 0;
struct Foo(bool);
impl Drop for Foo {
fn drop(&mut self) {
let Foo(fail) = *self;
unsafe { DROPS += 1; }
if fail { fail!() }
}
}
fn tld_fail(fail: bool) {
local_data_key!(foo: Foo);
foo.replace(Some(Foo(fail)));
}
fn gc_fail(fail: bool) {
struct A {
inner: RefCell<Option<Gc<A>>>,
other: Foo,
}
let a = box(GC) A {
inner: RefCell::new(None),
other: Foo(fail),
};
*a.inner.borrow_mut() = Some(a.clone());
}
fn main() {
let _ = task::try(proc() {
tld_fail(true);
gc_fail(false);
});
unsafe {
assert_eq!(DROPS, 2);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册