task.rs 16.2 KB
Newer Older
D
Derek Chiang 已提交
1
// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2 3 4 5 6 7 8 9 10 11 12 13 14 15
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

//! Language-level runtime services that should reasonably expected
//! to be available 'everywhere'. Local heaps, GC, unwinding,
//! local storage, and logging. Even a 'freestanding' Rust would likely want
//! to implement this.

16 17
use core::prelude::*;

18
use alloc::arc::Arc;
19 20 21 22 23 24
use alloc::owned::{AnyOwnExt, Box};
use core::any::Any;
use core::iter::Take;
use core::mem;
use core::finally::Finally;
use core::atomics::{AtomicUint, SeqCst};
25

26
use local_data;
27 28 29 30 31 32
use Runtime;
use local::Local;
use local_heap::LocalHeap;
use rtio::LocalIo;
use unwind::Unwinder;
use collections::str::SendStr;
33

D
Derek Chiang 已提交
34 35 36 37 38
/// The Task struct represents all state associated with a rust
/// task. There are at this point two primary "subtypes" of task,
/// however instead of using a subtype we just have a "task_type" field
/// in the struct. This contains a pointer to another struct that holds
/// the type-specific state.
39
pub struct Task {
40 41 42 43 44 45 46
    pub heap: LocalHeap,
    pub gc: GarbageCollector,
    pub storage: LocalStorage,
    pub unwinder: Unwinder,
    pub death: Death,
    pub destroyed: bool,
    pub name: Option<SendStr>,
47

48
    imp: Option<Box<Runtime:Send>>,
49 50
}

51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
pub struct TaskOpts {
    /// Invoke this procedure with the result of the task when it finishes.
    pub on_exit: Option<proc(Result):Send>,
    /// A name for the task-to-be, for identification in failure messages
    pub name: Option<SendStr>,
    /// The size of the stack for the spawned task
    pub stack_size: Option<uint>,
}

/// Indicates the manner in which a task exited.
///
/// A task that completes without failing is considered to exit successfully.
///
/// If you wish for this result's delivery to block until all
/// children tasks complete, recommend using a result future.
pub type Result = ::core::result::Result<(), Box<Any:Send>>;

68
pub struct GarbageCollector;
69
pub struct LocalStorage(pub Option<local_data::Map>);
70

71 72 73
/// A handle to a blocked task. Usually this means having the Box<Task>
/// pointer by ownership, but if the task is killable, a killer can steal it
/// at any time.
74
pub enum BlockedTask {
75
    Owned(Box<Task>),
76
    Shared(Arc<AtomicUint>),
77 78
}

79 80
/// Per-task state related to task death, killing, failure, etc.
pub struct Death {
81
    pub on_exit: Option<proc(Result):Send>,
82 83
}

P
Palmer Cox 已提交
84
pub struct BlockedTasks {
85
    inner: Arc<AtomicUint>,
86
}
87

88
impl Task {
89
    pub fn new() -> Task {
90
        Task {
91
            heap: LocalHeap::new(),
92
            gc: GarbageCollector,
93
            storage: LocalStorage(None),
94
            unwinder: Unwinder::new(),
95
            death: Death::new(),
96
            destroyed: false,
B
Ben Blum 已提交
97
            name: None,
98
            imp: None,
99 100
        }
    }
101

102 103 104 105 106 107 108 109 110
    /// Executes the given closure as if it's running inside this task. The task
    /// is consumed upon entry, and the destroyed task is returned from this
    /// function in order for the caller to free. This function is guaranteed to
    /// not unwind because the closure specified is run inside of a `rust_try`
    /// block. (this is the only try/catch block in the world).
    ///
    /// This function is *not* meant to be abused as a "try/catch" block. This
    /// is meant to be used at the absolute boundaries of a task's lifetime, and
    /// only for that purpose.
111
    pub fn run(~self, mut f: ||) -> Box<Task> {
112 113 114 115
        // Need to put ourselves into TLS, but also need access to the unwinder.
        // Unsafely get a handle to the task so we can continue to use it after
        // putting it in tls (so we can invoke the unwinder).
        let handle: *mut Task = unsafe {
A
Alex Crichton 已提交
116
            *mem::transmute::<&Box<Task>, &*mut Task>(&self)
117 118
        };
        Local::put(self);
119

120 121
        // The only try/catch block in the world. Attempt to run the task's
        // client-specified code and catch any failures.
122
        let try_block = || {
123

124
            // Run the task main function, then do some cleanup.
125
            f.finally(|| {
126 127
                // First, destroy task-local storage. This may run user dtors.
                //
128 129 130 131 132 133 134 135 136 137 138
                // FIXME #8302: Dear diary. I'm so tired and confused.
                // There's some interaction in rustc between the box
                // annihilator and the TLS dtor by which TLS is
                // accessed from annihilated box dtors *after* TLS is
                // destroyed. Somehow setting TLS back to null, as the
                // old runtime did, makes this work, but I don't currently
                // understand how. I would expect that, if the annihilator
                // reinvokes TLS while TLS is uninitialized, that
                // TLS would be reinitialized but never destroyed,
                // but somehow this works. I have no idea what's going
                // on but this seems to make things magically work. FML.
139 140 141 142 143 144
                //
                // (added after initial comment) A possible interaction here is
                // that the destructors for the objects in TLS themselves invoke
                // TLS, or possibly some destructors for those objects being
                // annihilated invoke TLS. Sadly these two operations seemed to
                // be intertwined, and miraculously work for now...
145
                let mut task = Local::borrow(None::<Task>);
146
                let storage_map = {
147
                    let &LocalStorage(ref mut optmap) = &mut task.storage;
148 149
                    optmap.take()
                };
150
                drop(task);
151
                drop(storage_map);
152

153
                // Destroy remaining boxes. Also may run user dtors.
154 155 156 157 158
                let mut task = Local::borrow(None::<Task>);
                let mut heap = mem::replace(&mut task.heap, LocalHeap::new());
                drop(task);
                unsafe { heap.annihilate() }
                drop(heap);
159
            })
160 161 162
        };

        unsafe { (*handle).unwinder.try(try_block); }
163

A
Alex Crichton 已提交
164 165 166 167
        // Here we must unsafely borrow the task in order to not remove it from
        // TLS. When collecting failure, we may attempt to send on a channel (or
        // just run aribitrary code), so we must be sure to still have a local
        // task in TLS.
A
Alex Crichton 已提交
168 169 170 171
        unsafe {
            let me: *mut Task = Local::unsafe_borrow();
            (*me).death.collect_failure((*me).unwinder.result());
        }
172
        let mut me: Box<Task> = Local::take();
173 174
        me.destroyed = true;
        return me;
175
    }
176

177 178 179
    /// Inserts a runtime object into this task, transferring ownership to the
    /// task. It is illegal to replace a previous runtime object in this task
    /// with this argument.
180
    pub fn put_runtime(&mut self, ops: Box<Runtime:Send>) {
181 182 183
        assert!(self.imp.is_none());
        self.imp = Some(ops);
    }
184

185 186 187 188 189 190 191
    /// Attempts to extract the runtime as a specific type. If the runtime does
    /// not have the provided type, then the runtime is not removed. If the
    /// runtime does have the specified type, then it is removed and returned
    /// (transfer of ownership).
    ///
    /// It is recommended to only use this method when *absolutely necessary*.
    /// This function may not be available in the future.
192
    pub fn maybe_take_runtime<T: 'static>(&mut self) -> Option<Box<T>> {
193
        // This is a terrible, terrible function. The general idea here is to
194 195 196 197
        // take the runtime, cast it to Box<Any>, check if it has the right
        // type, and then re-cast it back if necessary. The method of doing
        // this is pretty sketchy and involves shuffling vtables of trait
        // objects around, but it gets the job done.
198
        //
199
        // FIXME: This function is a serious code smell and should be avoided at
200 201 202 203 204
        //      all costs. I have yet to think of a method to avoid this
        //      function, and I would be saddened if more usage of the function
        //      crops up.
        unsafe {
            let imp = self.imp.take_unwrap();
A
Alex Crichton 已提交
205
            let &(vtable, _): &(uint, uint) = mem::transmute(&imp);
206 207 208
            match imp.wrap().move::<T>() {
                Ok(t) => Some(t),
                Err(t) => {
A
Alex Crichton 已提交
209
                    let (_, obj): (uint, uint) = mem::transmute(t);
210
                    let obj: Box<Runtime:Send> =
A
Alex Crichton 已提交
211
                        mem::transmute((vtable, obj));
212 213 214
                    self.put_runtime(obj);
                    None
                }
215 216 217 218
            }
        }
    }

219 220
    /// Spawns a sibling to this task. The newly spawned task is configured with
    /// the `opts` structure and will run `f` as the body of its code.
A
Alex Crichton 已提交
221
    pub fn spawn_sibling(mut ~self, opts: TaskOpts, f: proc():Send) {
222 223
        let ops = self.imp.take_unwrap();
        ops.spawn_sibling(self, opts, f)
224 225
    }

226 227 228 229
    /// Deschedules the current task, invoking `f` `amt` times. It is not
    /// recommended to use this function directly, but rather communication
    /// primitives in `std::comm` should be used.
    pub fn deschedule(mut ~self, amt: uint,
230
                      f: |BlockedTask| -> ::core::result::Result<(), BlockedTask>) {
231 232 233 234
        let ops = self.imp.take_unwrap();
        ops.deschedule(amt, self, f)
    }

H
Huon Wilson 已提交
235
    /// Wakes up a previously blocked task, optionally specifying whether the
236 237
    /// current task can accept a change in scheduling. This function can only
    /// be called on tasks that were previously blocked in `deschedule`.
238
    pub fn reawaken(mut ~self) {
239
        let ops = self.imp.take_unwrap();
240
        ops.reawaken(self);
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
    }

    /// Yields control of this task to another task. This function will
    /// eventually return, but possibly not immediately. This is used as an
    /// opportunity to allow other tasks a chance to run.
    pub fn yield_now(mut ~self) {
        let ops = self.imp.take_unwrap();
        ops.yield_now(self);
    }

    /// Similar to `yield_now`, except that this function may immediately return
    /// without yielding (depending on what the runtime decides to do).
    pub fn maybe_yield(mut ~self) {
        let ops = self.imp.take_unwrap();
        ops.maybe_yield(self);
    }

    /// Acquires a handle to the I/O factory that this task contains, normally
    /// stored in the task's runtime. This factory may not always be available,
    /// which is why the return type is `Option`
    pub fn local_io<'a>(&'a mut self) -> Option<LocalIo<'a>> {
        self.imp.get_mut_ref().local_io()
263
    }
264 265 266 267

    /// Returns the stack bounds for this task in (lo, hi) format. The stack
    /// bounds may not be known for all tasks, so the return value may be
    /// `None`.
268
    pub fn stack_bounds(&self) -> (uint, uint) {
269 270
        self.imp.get_ref().stack_bounds()
    }
271 272 273 274 275 276

    /// Returns whether it is legal for this task to block the OS thread that it
    /// is running on.
    pub fn can_block(&self) -> bool {
        self.imp.get_ref().can_block()
    }
277 278
}

279
impl Drop for Task {
D
Daniel Micay 已提交
280
    fn drop(&mut self) {
281
        rtdebug!("called drop for a task: {}", self as *mut Task as uint);
282
        rtassert!(self.destroyed);
283
    }
284 285
}

286 287 288 289 290 291
impl TaskOpts {
    pub fn new() -> TaskOpts {
        TaskOpts { on_exit: None, name: None, stack_size: None }
    }
}

P
Palmer Cox 已提交
292
impl Iterator<BlockedTask> for BlockedTasks {
293 294
    fn next(&mut self) -> Option<BlockedTask> {
        Some(Shared(self.inner.clone()))
295
    }
296
}
297

298 299
impl BlockedTask {
    /// Returns Some if the task was successfully woken; None if already killed.
300
    pub fn wake(self) -> Option<Box<Task>> {
301 302
        match self {
            Owned(task) => Some(task),
303 304
            Shared(arc) => {
                match arc.swap(0, SeqCst) {
305
                    0 => None,
306
                    n => Some(unsafe { mem::transmute(n) }),
307 308
                }
            }
309 310 311
        }
    }

312 313 314 315 316 317
    /// Reawakens this task if ownership is acquired. If finer-grained control
    /// is desired, use `wake` instead.
    pub fn reawaken(self) {
        self.wake().map(|t| t.reawaken());
    }

318 319 320 321
    // This assertion has two flavours because the wake involves an atomic op.
    // In the faster version, destructors will fail dramatically instead.
    #[cfg(not(test))] pub fn trash(self) { }
    #[cfg(test)]      pub fn trash(self) { assert!(self.wake().is_none()); }
322

323
    /// Create a blocked task, unless the task was already killed.
324
    pub fn block(task: Box<Task>) -> BlockedTask {
325 326
        Owned(task)
    }
327

328
    /// Converts one blocked task handle to a list of many handles to the same.
329
    pub fn make_selectable(self, num_handles: uint) -> Take<BlockedTasks> {
330 331
        let arc = match self {
            Owned(task) => {
A
Alex Crichton 已提交
332
                let flag = unsafe { AtomicUint::new(mem::transmute(task)) };
333
                Arc::new(flag)
334
            }
335
            Shared(arc) => arc.clone(),
336
        };
P
Palmer Cox 已提交
337
        BlockedTasks{ inner: arc }.take(num_handles)
338 339
    }

340 341 342 343
    /// Convert to an unsafe uint value. Useful for storing in a pipe's state
    /// flag.
    #[inline]
    pub unsafe fn cast_to_uint(self) -> uint {
344
        match self {
345
            Owned(task) => {
A
Alex Crichton 已提交
346
                let blocked_task_ptr: uint = mem::transmute(task);
347 348 349 350
                rtassert!(blocked_task_ptr & 0x1 == 0);
                blocked_task_ptr
            }
            Shared(arc) => {
A
Alex Crichton 已提交
351
                let blocked_task_ptr: uint = mem::transmute(box arc);
352 353
                rtassert!(blocked_task_ptr & 0x1 == 0);
                blocked_task_ptr | 0x1
354 355 356 357
            }
        }
    }

358 359 360 361 362
    /// Convert from an unsafe uint value. Useful for retrieving a pipe's state
    /// flag.
    #[inline]
    pub unsafe fn cast_from_uint(blocked_task_ptr: uint) -> BlockedTask {
        if blocked_task_ptr & 0x1 == 0 {
A
Alex Crichton 已提交
363
            Owned(mem::transmute(blocked_task_ptr))
364
        } else {
365
            let ptr: Box<Arc<AtomicUint>> =
A
Alex Crichton 已提交
366
                mem::transmute(blocked_task_ptr & !1);
367
            Shared(*ptr)
368 369 370 371
        }
    }
}

372 373 374 375
impl Death {
    pub fn new() -> Death {
        Death { on_exit: None, }
    }
376

377
    /// Collect failure exit codes from children and propagate them to a parent.
378
    pub fn collect_failure(&mut self, result: Result) {
379
        match self.on_exit.take() {
380
            Some(f) => f(result),
381
            None => {}
382 383 384 385
        }
    }
}

386 387 388
impl Drop for Death {
    fn drop(&mut self) {
        // make this type noncopyable
389 390 391
    }
}

392 393
#[cfg(test)]
mod test {
394
    use super::*;
395 396
    use std::prelude::*;
    use std::task;
397 398 399

    #[test]
    fn local_heap() {
A
Alex Crichton 已提交
400 401 402 403
        let a = @5;
        let b = a;
        assert!(*a == 5);
        assert!(*b == 5);
404
    }
405 406 407

    #[test]
    fn tls() {
408
        local_data_key!(key: @String)
409
        key.replace(Some(@"data".to_string()));
410
        assert_eq!(key.get().unwrap().as_slice(), "data");
411
        local_data_key!(key2: @String)
412
        key2.replace(Some(@"data".to_string()));
413
        assert_eq!(key2.get().unwrap().as_slice(), "data");
414
    }
415 416 417

    #[test]
    fn unwind() {
A
Alex Crichton 已提交
418 419 420 421 422 423
        let result = task::try(proc()());
        rtdebug!("trying first assert");
        assert!(result.is_ok());
        let result = task::try::<()>(proc() fail!());
        rtdebug!("trying second assert");
        assert!(result.is_err());
424
    }
425 426 427

    #[test]
    fn rng() {
428
        use std::rand::{StdRng, Rng};
429
        let mut r = StdRng::new().ok().unwrap();
A
Alex Crichton 已提交
430
        let _ = r.next_u32();
431
    }
432

433 434
    #[test]
    fn comm_stream() {
435 436 437
        let (tx, rx) = channel();
        tx.send(10);
        assert!(rx.recv() == 10);
438
    }
439

440 441
    #[test]
    fn comm_shared_chan() {
442 443 444
        let (tx, rx) = channel();
        tx.send(10);
        assert!(rx.recv() == 10);
445 446
    }

447 448
    #[test]
    fn heap_cycles() {
449
        use std::cell::RefCell;
450

A
Alex Crichton 已提交
451
        struct List {
452
            next: Option<@RefCell<List>>,
A
Alex Crichton 已提交
453
        }
454

455 456
        let a = @RefCell::new(List { next: None });
        let b = @RefCell::new(List { next: Some(a) });
457

458 459
        {
            let mut a = a.borrow_mut();
460
            a.next = Some(b);
461
        }
462
    }
463 464 465

    #[test]
    #[should_fail]
A
Alex Crichton 已提交
466
    fn test_begin_unwind() {
467
        use std::rt::unwind::begin_unwind;
A
Alex Crichton 已提交
468 469
        begin_unwind("cause", file!(), line!())
    }
470 471 472 473 474

    // Task blocking tests

    #[test]
    fn block_and_wake() {
475
        let task = box Task::new();
A
Alex Crichton 已提交
476 477
        let mut task = BlockedTask::block(task).wake().unwrap();
        task.destroyed = true;
478
    }
479
}