timer_timerfd.rs 11.3 KB
Newer Older
1
// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
A
Alex Crichton 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

//! Timers based on timerfd_create(2)
//!
//! On OSes which support timerfd_create, we can use these much more accurate
//! timers over select() + a timeout (see timer_other.rs). This strategy still
//! employs a worker thread which does the waiting on the timer fds (to send
//! messages away).
//!
//! The worker thread in this implementation uses epoll(7) to block. It
//! maintains a working set of *all* native timers in the process, along with a
//! pipe file descriptor used to communicate that there is data available on the
//! incoming channel to the worker thread. Timers send requests to update their
//! timerfd settings to the worker thread (see the comment above 'oneshot' for
//! why).
//!
//! As with timer_other, timers just using sleep() do not use the timerfd at
26 27
//! all. They remove the timerfd from the worker thread and then invoke
//! nanosleep() to block the calling thread.
A
Alex Crichton 已提交
28 29 30 31
//!
//! As with timer_other, all units in this file are in units of millseconds.

use std::comm::Data;
32
use libc;
A
Alex Crichton 已提交
33 34 35
use std::ptr;
use std::os;
use std::rt::rtio;
36
use std::mem;
A
Alex Crichton 已提交
37 38 39 40 41 42

use io::file::FileDesc;
use io::IoResult;
use io::timer_helper;

pub struct Timer {
43 44
    fd: FileDesc,
    on_worker: bool,
A
Alex Crichton 已提交
45 46
}

47
#[allow(visible_private_types)]
A
Alex Crichton 已提交
48
pub enum Req {
49 50
    NewTimer(libc::c_int, Sender<()>, bool, imp::itimerspec),
    RemoveTimer(libc::c_int, Sender<()>),
A
Alex Crichton 已提交
51 52 53
    Shutdown,
}

54
fn helper(input: libc::c_int, messages: Receiver<Req>) {
A
Alex Crichton 已提交
55 56 57 58 59 60 61
    let efd = unsafe { imp::epoll_create(10) };
    let _fd1 = FileDesc::new(input, true);
    let _fd2 = FileDesc::new(efd, true);

    fn add(efd: libc::c_int, fd: libc::c_int) {
        let event = imp::epoll_event {
            events: imp::EPOLLIN as u32,
62
            data: fd as i64,
A
Alex Crichton 已提交
63 64 65 66 67 68 69
        };
        let ret = unsafe {
            imp::epoll_ctl(efd, imp::EPOLL_CTL_ADD, fd, &event)
        };
        assert_eq!(ret, 0);
    }
    fn del(efd: libc::c_int, fd: libc::c_int) {
70
        let event = imp::epoll_event { events: 0, data: 0 };
A
Alex Crichton 已提交
71 72 73 74 75 76 77
        let ret = unsafe {
            imp::epoll_ctl(efd, imp::EPOLL_CTL_DEL, fd, &event)
        };
        assert_eq!(ret, 0);
    }

    add(efd, input);
78
    let events: [imp::epoll_event, ..16] = unsafe { mem::init() };
79
    let mut list: ~[(libc::c_int, Sender<()>, bool)] = ~[];
A
Alex Crichton 已提交
80 81 82 83 84 85
    'outer: loop {
        let n = match unsafe {
            imp::epoll_wait(efd, events.as_ptr(),
                            events.len() as libc::c_int, -1)
        } {
            0 => fail!("epoll_wait returned immediately!"),
86
            -1 if os::errno() == libc::EINTR as int => { continue }
A
Alex Crichton 已提交
87 88 89 90 91 92
            -1 => fail!("epoll wait failed: {}", os::last_os_error()),
            n => n
        };

        let mut incoming = false;
        for event in events.slice_to(n as uint).iter() {
93
            let fd = event.data as libc::c_int;
A
Alex Crichton 已提交
94 95 96
            if fd == input {
                let mut buf = [0, ..1];
                // drain the input file descriptor of its input
A
Alex Crichton 已提交
97
                let _ = FileDesc::new(fd, false).inner_read(buf).unwrap();
A
Alex Crichton 已提交
98 99 100 101 102
                incoming = true;
            } else {
                let mut bits = [0, ..8];
                // drain the timerfd of how many times its fired
                //
103
                // FIXME: should this perform a send() this number of
A
Alex Crichton 已提交
104
                //      times?
A
Alex Crichton 已提交
105
                let _ = FileDesc::new(fd, false).inner_read(bits).unwrap();
106 107 108 109 110 111 112
                let (remove, i) = {
                    match list.bsearch(|&(f, _, _)| f.cmp(&fd)) {
                        Some(i) => {
                            let (_, ref c, oneshot) = list[i];
                            (!c.try_send(()) || oneshot, i)
                        }
                        None => fail!("fd not active: {}", fd),
A
Alex Crichton 已提交
113 114 115
                    }
                };
                if remove {
116
                    drop(list.remove(i));
A
Alex Crichton 已提交
117 118 119 120 121 122 123 124 125 126 127 128 129 130
                    del(efd, fd);
                }
            }
        }

        while incoming {
            match messages.try_recv() {
                Data(NewTimer(fd, chan, one, timeval)) => {
                    // acknowledge we have the new channel, we will never send
                    // another message to the old channel
                    chan.send(());

                    // If we haven't previously seen the file descriptor, then
                    // we need to add it to the epoll set.
131 132 133 134 135 136 137 138 139 140 141
                    match list.bsearch(|&(f, _, _)| f.cmp(&fd)) {
                        Some(i) => {
                            drop(mem::replace(&mut list[i], (fd, chan, one)));
                        }
                        None => {
                            match list.iter().position(|&(f, _, _)| f >= fd) {
                                Some(i) => list.insert(i, (fd, chan, one)),
                                None => list.push((fd, chan, one)),
                            }
                            add(efd, fd);
                        }
A
Alex Crichton 已提交
142 143 144 145 146 147 148 149 150 151 152
                    }

                    // Update the timerfd's time value now that we have control
                    // of the timerfd
                    let ret = unsafe {
                        imp::timerfd_settime(fd, 0, &timeval, ptr::null())
                    };
                    assert_eq!(ret, 0);
                }

                Data(RemoveTimer(fd, chan)) => {
153 154 155 156 157 158
                    match list.bsearch(|&(f, _, _)| f.cmp(&fd)) {
                        Some(i) => {
                            drop(list.remove(i));
                            del(efd, fd);
                        }
                        None => {}
A
Alex Crichton 已提交
159 160 161 162 163
                    }
                    chan.send(());
                }

                Data(Shutdown) => {
164
                    assert!(list.len() == 0);
A
Alex Crichton 已提交
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
                    break 'outer;
                }

                _ => break,
            }
        }
    }
}

impl Timer {
    pub fn new() -> IoResult<Timer> {
        timer_helper::boot(helper);
        match unsafe { imp::timerfd_create(imp::CLOCK_MONOTONIC, 0) } {
            -1 => Err(super::last_error()),
            n => Ok(Timer { fd: FileDesc::new(n, true), on_worker: false, }),
        }
    }

    pub fn sleep(ms: u64) {
184 185 186 187 188 189 190 191 192
        let mut to_sleep = libc::timespec {
            tv_sec: (ms / 1000) as libc::time_t,
            tv_nsec: ((ms % 1000) * 1000000) as libc::c_long,
        };
        while unsafe { libc::nanosleep(&to_sleep, &mut to_sleep) } != 0 {
            if os::errno() as int != libc::EINTR as int {
                fail!("failed to sleep, but not because of EINTR?");
            }
        }
A
Alex Crichton 已提交
193 194 195 196 197
    }

    fn remove(&mut self) {
        if !self.on_worker { return }

198 199 200
        let (tx, rx) = channel();
        timer_helper::send(RemoveTimer(self.fd.fd(), tx));
        rx.recv();
A
Alex Crichton 已提交
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
        self.on_worker = false;
    }
}

impl rtio::RtioTimer for Timer {
    fn sleep(&mut self, msecs: u64) {
        self.remove();
        Timer::sleep(msecs);
    }

    // Periodic and oneshot channels are updated by updating the settings on the
    // corresopnding timerfd. The update is not performed on the thread calling
    // oneshot or period, but rather the helper epoll thread. The reason for
    // this is to avoid losing messages and avoid leaking messages across ports.
    //
    // By updating the timerfd on the helper thread, we're guaranteed that all
    // messages for a particular setting of the timer will be received by the
    // new channel/port pair rather than leaking old messages onto the new port
    // or leaking new messages onto the old port.
    //
    // We also wait for the remote thread to actually receive the new settings
    // before returning to guarantee the invariant that when oneshot() and
    // period() return that the old port will never receive any more messages.

225 226
    fn oneshot(&mut self, msecs: u64) -> Receiver<()> {
        let (tx, rx) = channel();
A
Alex Crichton 已提交
227 228 229 230 231 232 233 234

        let new_value = imp::itimerspec {
            it_interval: imp::timespec { tv_sec: 0, tv_nsec: 0 },
            it_value: imp::timespec {
                tv_sec: (msecs / 1000) as libc::time_t,
                tv_nsec: ((msecs % 1000) * 1000000) as libc::c_long,
            }
        };
235 236
        timer_helper::send(NewTimer(self.fd.fd(), tx, true, new_value));
        rx.recv();
A
Alex Crichton 已提交
237 238
        self.on_worker = true;

239
        return rx;
A
Alex Crichton 已提交
240 241
    }

242 243
    fn period(&mut self, msecs: u64) -> Receiver<()> {
        let (tx, rx) = channel();
A
Alex Crichton 已提交
244 245 246 247 248 249

        let spec = imp::timespec {
            tv_sec: (msecs / 1000) as libc::time_t,
            tv_nsec: ((msecs % 1000) * 1000000) as libc::c_long,
        };
        let new_value = imp::itimerspec { it_interval: spec, it_value: spec, };
250 251
        timer_helper::send(NewTimer(self.fd.fd(), tx, false, new_value));
        rx.recv();
A
Alex Crichton 已提交
252 253
        self.on_worker = true;

254
        return rx;
A
Alex Crichton 已提交
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
    }
}

impl Drop for Timer {
    fn drop(&mut self) {
        // When the timerfd file descriptor is closed, it will be automatically
        // removed from the epoll set of the worker thread, but we want to make
        // sure that the associated channel is also removed from the worker's
        // hash map.
        self.remove();
    }
}

#[allow(dead_code)]
mod imp {
270
    use libc;
A
Alex Crichton 已提交
271 272 273 274 275 276 277 278 279 280 281 282 283 284

    pub static CLOCK_MONOTONIC: libc::c_int = 1;
    pub static EPOLL_CTL_ADD: libc::c_int = 1;
    pub static EPOLL_CTL_DEL: libc::c_int = 2;
    pub static EPOLL_CTL_MOD: libc::c_int = 3;
    pub static EPOLLIN: libc::c_int = 0x001;
    pub static EPOLLOUT: libc::c_int = 0x004;
    pub static EPOLLPRI: libc::c_int = 0x002;
    pub static EPOLLERR: libc::c_int = 0x008;
    pub static EPOLLRDHUP: libc::c_int = 0x2000;
    pub static EPOLLET: libc::c_int = 1 << 31;
    pub static EPOLLHUP: libc::c_int = 0x010;
    pub static EPOLLONESHOT: libc::c_int = 1 << 30;

285 286
    #[cfg(target_arch = "x86_64")]
    #[packed]
A
Alex Crichton 已提交
287
    pub struct epoll_event {
288 289
        pub events: u32,
        pub data: i64,
A
Alex Crichton 已提交
290 291
    }

292 293
    #[cfg(not(target_arch = "x86_64"))]
    pub struct epoll_event {
294 295
        pub events: u32,
        pub data: i64,
A
Alex Crichton 已提交
296 297 298
    }

    pub struct timespec {
299 300
        pub tv_sec: libc::time_t,
        pub tv_nsec: libc::c_long,
A
Alex Crichton 已提交
301 302 303
    }

    pub struct itimerspec {
304 305
        pub it_interval: timespec,
        pub it_value: timespec,
A
Alex Crichton 已提交
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
    }

    extern {
        pub fn timerfd_create(clockid: libc::c_int,
                              flags: libc::c_int) -> libc::c_int;
        pub fn timerfd_settime(fd: libc::c_int,
                               flags: libc::c_int,
                               new_value: *itimerspec,
                               old_value: *itimerspec) -> libc::c_int;
        pub fn timerfd_gettime(fd: libc::c_int,
                               curr_value: *itimerspec) -> libc::c_int;

        pub fn epoll_create(size: libc::c_int) -> libc::c_int;
        pub fn epoll_ctl(epfd: libc::c_int,
                         op: libc::c_int,
                         fd: libc::c_int,
                         event: *epoll_event) -> libc::c_int;
        pub fn epoll_wait(epfd: libc::c_int,
                          events: *epoll_event,
                          maxevents: libc::c_int,
                          timeout: libc::c_int) -> libc::c_int;
    }
}