weak_task.rs 5.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

B
Brian Anderson 已提交
11 12 13 14 15 16 17 18 19 20
/*!
Weak tasks

Weak tasks are a runtime feature for building global services that
do not keep the runtime alive. Normally the runtime exits when all
tasks exits, but if a task is weak then the runtime may exit while
it is running, sending a notification to the task that the runtime
is trying to shut down.
*/

21 22 23
use cell::Cell;
use comm::{GenericSmartChan, stream};
use comm::{Port, Chan, SharedChan, GenericChan, GenericPort};
24
use hashmap::HashMap;
25
use option::{Some, None};
26 27 28
use unstable::at_exit::at_exit;
use unstable::finally::Finally;
use unstable::global::global_data_clone_create;
B
Brian Anderson 已提交
29
use task::rt::{task_id, get_task_id};
A
Alex Crichton 已提交
30 31 32
use task::task;

#[cfg(test)] use task::spawn;
B
Brian Anderson 已提交
33 34 35

type ShutdownMsg = ();

36
// FIXME #4729: This could be a PortOne but I've experienced bugginess
B
Brian Anderson 已提交
37 38 39 40 41
// with oneshot pipes and try_send
pub unsafe fn weaken_task(f: &fn(Port<ShutdownMsg>)) {
    let service = global_data_clone_create(global_data_key,
                                           create_global_service);
    let (shutdown_port, shutdown_chan) = stream::<ShutdownMsg>();
42
    let shutdown_port = Cell::new(shutdown_port);
B
Brian Anderson 已提交
43 44
    let task = get_task_id();
    // Expect the weak task service to be alive
P
Patrick Walton 已提交
45
    assert!(service.try_send(RegisterWeakTask(task, shutdown_chan)));
46
    rust_dec_kernel_live_count();
47
    do (|| {
48
        f(shutdown_port.take())
49
    }).finally || {
50
        rust_inc_kernel_live_count();
B
Brian Anderson 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
        // Service my have already exited
        service.send(UnregisterWeakTask(task));
    }
}

type WeakTaskService = SharedChan<ServiceMsg>;
type TaskHandle = task_id;

fn global_data_key(_v: WeakTaskService) { }

enum ServiceMsg {
    RegisterWeakTask(TaskHandle, Chan<ShutdownMsg>),
    UnregisterWeakTask(TaskHandle),
    Shutdown
}

fn create_global_service() -> ~WeakTaskService {

    debug!("creating global weak task service");
    let (port, chan) = stream::<ServiceMsg>();
71
    let port = Cell::new(port);
72
    let chan = SharedChan::new(chan);
B
Brian Anderson 已提交
73 74
    let chan_clone = chan.clone();

75 76 77
    let mut task = task();
    task.unlinked();
    do task.spawn {
B
Brian Anderson 已提交
78
        debug!("running global weak task service");
79
        let port = Cell::new(port.take());
80
        do (|| {
81
            let port = port.take();
B
Brian Anderson 已提交
82 83
            // The weak task service is itself a weak task
            debug!("weakening the weak service task");
84
            unsafe { rust_dec_kernel_live_count(); }
B
Brian Anderson 已提交
85
            run_weak_task_service(port);
86
        }).finally {
B
Brian Anderson 已提交
87
            debug!("unweakening the weak service task");
88
            unsafe { rust_inc_kernel_live_count(); }
B
Brian Anderson 已提交
89 90 91 92 93 94 95 96 97 98 99 100 101
        }
    }

    do at_exit {
        debug!("shutting down weak task service");
        chan.send(Shutdown);
    }

    return ~chan_clone;
}

fn run_weak_task_service(port: Port<ServiceMsg>) {

102
    let mut shutdown_map = HashMap::new();
B
Brian Anderson 已提交
103 104 105 106 107 108

    loop {
        match port.recv() {
            RegisterWeakTask(task, shutdown_chan) => {
                let previously_unregistered =
                    shutdown_map.insert(task, shutdown_chan);
P
Patrick Walton 已提交
109
                assert!(previously_unregistered);
B
Brian Anderson 已提交
110 111 112 113 114 115 116 117
            }
            UnregisterWeakTask(task) => {
                match shutdown_map.pop(&task) {
                    Some(shutdown_chan) => {
                        // Oneshot pipes must send, even though
                        // nobody will receive this
                        shutdown_chan.send(());
                    }
118
                    None => fail!()
B
Brian Anderson 已提交
119 120 121 122 123 124
                }
            }
            Shutdown => break
        }
    }

125
    for shutdown_map.consume().advance |(_, shutdown_chan)| {
B
Brian Anderson 已提交
126 127 128 129 130 131
        // Weak task may have already exited
        shutdown_chan.send(());
    }
}

extern {
132 133
    unsafe fn rust_inc_kernel_live_count();
    unsafe fn rust_dec_kernel_live_count();
B
Brian Anderson 已提交
134 135 136
}

#[test]
137
fn test_simple() {
B
Brian Anderson 已提交
138
    let (port, chan) = stream();
139 140 141 142
    do spawn {
        unsafe {
            do weaken_task |_signal| {
            }
B
Brian Anderson 已提交
143 144 145 146 147 148 149
        }
        chan.send(());
    }
    port.recv();
}

#[test]
150
fn test_weak_weak() {
B
Brian Anderson 已提交
151
    let (port, chan) = stream();
152 153 154 155 156 157
    do spawn {
        unsafe {
            do weaken_task |_signal| {
            }
            do weaken_task |_signal| {
            }
B
Brian Anderson 已提交
158 159 160 161 162 163 164
        }
        chan.send(());
    }
    port.recv();
}

#[test]
165 166 167 168 169 170
fn test_wait_for_signal() {
    do spawn {
        unsafe {
            do weaken_task |signal| {
                signal.recv();
            }
B
Brian Anderson 已提交
171 172 173 174 175
        }
    }
}

#[test]
176
fn test_wait_for_signal_many() {
B
Brian Anderson 已提交
177 178
    use uint;
    for uint::range(0, 100) |_| {
179 180 181 182 183
        do spawn {
            unsafe {
                do weaken_task |signal| {
                    signal.recv();
                }
B
Brian Anderson 已提交
184 185 186 187 188 189
            }
        }
    }
}

#[test]
190
fn test_select_stream_and_oneshot() {
191
    use comm::select2i;
B
Brian Anderson 已提交
192 193
    use either::{Left, Right};

194
    let (port, chan) = stream();
195
    let port = Cell::new(port);
B
Brian Anderson 已提交
196
    let (waitport, waitchan) = stream();
197 198
    do spawn {
        unsafe {
P
Patrick Walton 已提交
199 200 201
            do weaken_task |mut signal| {
                let mut port = port.take();
                match select2i(&mut port, &mut signal) {
202
                    Left(*) => (),
203
                    Right(*) => fail!()
204
                }
B
Brian Anderson 已提交
205 206 207 208 209 210 211
            }
        }
        waitchan.send(());
    }
    chan.send(());
    waitport.recv();
}