提交 03b324ff 编写于 作者: B bors

auto merge of #12186 : alexcrichton/rust/no-sleep-2, r=brson

Any single-threaded task benchmark will spend a good chunk of time in `kqueue()` on osx and `epoll()` on linux, and the reason for this is that each time a task is terminated it will hit the syscall. When a task terminates, it context switches back to the scheduler thread, and the scheduler thread falls out of `run_sched_once` whenever it figures out that it did some work.

If we know that `epoll()` will return nothing, then we can continue to do work locally (only while there's work to be done). We must fall back to `epoll()` whenever there's active I/O in order to check whether it's ready or not, but without that (which is largely the case in benchmarks), we can prevent the costly syscall and can get a nice speedup.

I've separated the commits into preparation for this change and then the change itself, the last commit message has more details.
...@@ -158,6 +158,8 @@ fn remote_callback(&mut self, f: ~Callback) -> ~RemoteCallback { ...@@ -158,6 +158,8 @@ fn remote_callback(&mut self, f: ~Callback) -> ~RemoteCallback {
} }
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> { None } fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> { None }
fn has_active_io(&self) -> bool { false }
} }
struct BasicRemote { struct BasicRemote {
......
...@@ -252,12 +252,23 @@ pub fn run(mut ~self, stask: ~GreenTask) -> ~GreenTask { ...@@ -252,12 +252,23 @@ pub fn run(mut ~self, stask: ~GreenTask) -> ~GreenTask {
// * Execution Functions - Core Loop Logic // * Execution Functions - Core Loop Logic
// The model for this function is that you continue through it // This function is run from the idle callback on the uv loop, indicating
// until you either use the scheduler while performing a schedule // that there are no I/O events pending. When this function returns, we will
// action, in which case you give it away and return early, or // fall back to epoll() in the uv event loop, waiting for more things to
// you reach the end and sleep. In the case that a scheduler // happen. We may come right back off epoll() if the idle callback is still
// action is performed the loop is evented such that this function // active, in which case we're truly just polling to see if I/O events are
// is called again. // complete.
//
// The model for this function is to execute as much work as possible while
// still fairly considering I/O tasks. Falling back to epoll() frequently is
// often quite expensive, so we attempt to avoid it as much as possible. If
// we have any active I/O on the event loop, then we're forced to fall back
// to epoll() in order to provide fairness, but as long as we're doing work
// and there's no active I/O, we can continue to do work.
//
// If we try really hard to do some work, but no work is available to be
// done, then we fall back to epoll() to block this thread waiting for more
// work (instead of busy waiting).
fn run_sched_once(mut ~self, stask: ~GreenTask) { fn run_sched_once(mut ~self, stask: ~GreenTask) {
// Make sure that we're not lying in that the `stask` argument is indeed // Make sure that we're not lying in that the `stask` argument is indeed
// the scheduler task for this scheduler. // the scheduler task for this scheduler.
...@@ -269,26 +280,46 @@ fn run_sched_once(mut ~self, stask: ~GreenTask) { ...@@ -269,26 +280,46 @@ fn run_sched_once(mut ~self, stask: ~GreenTask) {
// First we check for scheduler messages, these are higher // First we check for scheduler messages, these are higher
// priority than regular tasks. // priority than regular tasks.
let (sched, stask) = let (mut sched, mut stask, mut did_work) =
match self.interpret_message_queue(stask, DontTryTooHard) { self.interpret_message_queue(stask, DontTryTooHard);
Some(pair) => pair,
None => return
};
// This helper will use a randomized work-stealing algorithm
// to find work.
let (sched, stask) = match sched.do_work(stask) {
Some(pair) => pair,
None => return
};
// Now, before sleeping we need to find out if there really // After processing a message, we consider doing some more work on the
// were any messages. Give it your best! // event loop. The "keep going" condition changes after the first
let (mut sched, stask) = // iteration becase we don't want to spin here infinitely.
match sched.interpret_message_queue(stask, GiveItYourBest) { //
Some(pair) => pair, // Once we start doing work we can keep doing work so long as the
None => return // iteration does something. Note that we don't want to starve the
// message queue here, so each iteration when we're done working we
// check the message queue regardless of whether we did work or not.
let mut keep_going = !did_work || !sched.event_loop.has_active_io();
while keep_going {
let (a, b, c) = match sched.do_work(stask) {
(sched, task, false) => {
sched.interpret_message_queue(task, GiveItYourBest)
}
(sched, task, true) => {
let (sched, task, _) =
sched.interpret_message_queue(task, GiveItYourBest);
(sched, task, true)
}
}; };
sched = a;
stask = b;
did_work = c;
// We only keep going if we managed to do something productive and
// also don't have any active I/O. If we didn't do anything, we
// should consider going to sleep, and if we have active I/O we need
// to poll for completion.
keep_going = did_work && !sched.event_loop.has_active_io();
}
// If we ever did some work, then we shouldn't put our scheduler
// entirely to sleep just yet. Leave the idle callback active and fall
// back to epoll() to see what's going on.
if did_work {
return stask.put_with_sched(sched);
}
// If we got here then there was no work to do. // If we got here then there was no work to do.
// Generate a SchedHandle and push it to the sleeper list so // Generate a SchedHandle and push it to the sleeper list so
...@@ -318,7 +349,7 @@ fn run_sched_once(mut ~self, stask: ~GreenTask) { ...@@ -318,7 +349,7 @@ fn run_sched_once(mut ~self, stask: ~GreenTask) {
// return None. // return None.
fn interpret_message_queue(mut ~self, stask: ~GreenTask, fn interpret_message_queue(mut ~self, stask: ~GreenTask,
effort: EffortLevel) effort: EffortLevel)
-> Option<(~Scheduler, ~GreenTask)> -> (~Scheduler, ~GreenTask, bool)
{ {
let msg = if effort == DontTryTooHard { let msg = if effort == DontTryTooHard {
...@@ -349,25 +380,25 @@ fn interpret_message_queue(mut ~self, stask: ~GreenTask, ...@@ -349,25 +380,25 @@ fn interpret_message_queue(mut ~self, stask: ~GreenTask,
Some(PinnedTask(task)) => { Some(PinnedTask(task)) => {
let mut task = task; let mut task = task;
task.give_home(HomeSched(self.make_handle())); task.give_home(HomeSched(self.make_handle()));
self.resume_task_immediately(stask, task).put(); let (sched, task) = self.resume_task_immediately(stask, task);
return None; (sched, task, true)
} }
Some(TaskFromFriend(task)) => { Some(TaskFromFriend(task)) => {
rtdebug!("got a task from a friend. lovely!"); rtdebug!("got a task from a friend. lovely!");
self.process_task(stask, task, let (sched, task) =
Scheduler::resume_task_immediately_cl); self.process_task(stask, task,
return None; Scheduler::resume_task_immediately_cl);
(sched, task, true)
} }
Some(RunOnce(task)) => { Some(RunOnce(task)) => {
// bypass the process_task logic to force running this task once // bypass the process_task logic to force running this task once
// on this home scheduler. This is often used for I/O (homing). // on this home scheduler. This is often used for I/O (homing).
self.resume_task_immediately(stask, task).put(); let (sched, task) = self.resume_task_immediately(stask, task);
return None; (sched, task, true)
} }
Some(Wake) => { Some(Wake) => {
self.sleepy = false; self.sleepy = false;
stask.put_with_sched(self); (self, stask, true)
return None;
} }
Some(Shutdown) => { Some(Shutdown) => {
rtdebug!("shutting down"); rtdebug!("shutting down");
...@@ -389,31 +420,30 @@ fn interpret_message_queue(mut ~self, stask: ~GreenTask, ...@@ -389,31 +420,30 @@ fn interpret_message_queue(mut ~self, stask: ~GreenTask,
// event loop references we will shut down. // event loop references we will shut down.
self.no_sleep = true; self.no_sleep = true;
self.sleepy = false; self.sleepy = false;
stask.put_with_sched(self); (self, stask, true)
return None;
} }
Some(NewNeighbor(neighbor)) => { Some(NewNeighbor(neighbor)) => {
self.work_queues.push(neighbor); self.work_queues.push(neighbor);
return Some((self, stask)); (self, stask, false)
}
None => {
return Some((self, stask));
} }
None => (self, stask, false)
} }
} }
fn do_work(mut ~self, stask: ~GreenTask) -> Option<(~Scheduler, ~GreenTask)> { fn do_work(mut ~self,
stask: ~GreenTask) -> (~Scheduler, ~GreenTask, bool) {
rtdebug!("scheduler calling do work"); rtdebug!("scheduler calling do work");
match self.find_work() { match self.find_work() {
Some(task) => { Some(task) => {
rtdebug!("found some work! running the task"); rtdebug!("found some work! running the task");
self.process_task(stask, task, let (sched, task) =
Scheduler::resume_task_immediately_cl); self.process_task(stask, task,
return None; Scheduler::resume_task_immediately_cl);
(sched, task, true)
} }
None => { None => {
rtdebug!("no work was found, returning the scheduler struct"); rtdebug!("no work was found, returning the scheduler struct");
return Some((self, stask)); (self, stask, false)
} }
} }
} }
...@@ -486,7 +516,8 @@ fn try_steals(&mut self) -> Option<~GreenTask> { ...@@ -486,7 +516,8 @@ fn try_steals(&mut self) -> Option<~GreenTask> {
// place. // place.
fn process_task(mut ~self, cur: ~GreenTask, fn process_task(mut ~self, cur: ~GreenTask,
mut next: ~GreenTask, schedule_fn: SchedulingFn) { mut next: ~GreenTask,
schedule_fn: SchedulingFn) -> (~Scheduler, ~GreenTask) {
rtdebug!("processing a task"); rtdebug!("processing a task");
match next.take_unwrap_home() { match next.take_unwrap_home() {
...@@ -495,23 +526,23 @@ fn process_task(mut ~self, cur: ~GreenTask, ...@@ -495,23 +526,23 @@ fn process_task(mut ~self, cur: ~GreenTask,
rtdebug!("sending task home"); rtdebug!("sending task home");
next.give_home(HomeSched(home_handle)); next.give_home(HomeSched(home_handle));
Scheduler::send_task_home(next); Scheduler::send_task_home(next);
cur.put_with_sched(self); (self, cur)
} else { } else {
rtdebug!("running task here"); rtdebug!("running task here");
next.give_home(HomeSched(home_handle)); next.give_home(HomeSched(home_handle));
schedule_fn(self, cur, next); schedule_fn(self, cur, next)
} }
} }
AnySched if self.run_anything => { AnySched if self.run_anything => {
rtdebug!("running anysched task here"); rtdebug!("running anysched task here");
next.give_home(AnySched); next.give_home(AnySched);
schedule_fn(self, cur, next); schedule_fn(self, cur, next)
} }
AnySched => { AnySched => {
rtdebug!("sending task to friend"); rtdebug!("sending task to friend");
next.give_home(AnySched); next.give_home(AnySched);
self.send_to_friend(next); self.send_to_friend(next);
cur.put_with_sched(self); (self, cur)
} }
} }
} }
...@@ -664,18 +695,19 @@ pub fn get_contexts<'a>(current_task: &mut GreenTask, next_task: &mut GreenTask) ...@@ -664,18 +695,19 @@ pub fn get_contexts<'a>(current_task: &mut GreenTask, next_task: &mut GreenTask)
// * Context Swapping Helpers - Here be ugliness! // * Context Swapping Helpers - Here be ugliness!
pub fn resume_task_immediately(~self, cur: ~GreenTask, pub fn resume_task_immediately(~self, cur: ~GreenTask,
next: ~GreenTask) -> ~GreenTask { next: ~GreenTask) -> (~Scheduler, ~GreenTask) {
assert!(cur.is_sched()); assert!(cur.is_sched());
self.change_task_context(cur, next, |sched, stask| { let mut cur = self.change_task_context(cur, next, |sched, stask| {
assert!(sched.sched_task.is_none()); assert!(sched.sched_task.is_none());
sched.sched_task = Some(stask); sched.sched_task = Some(stask);
}) });
(cur.sched.take_unwrap(), cur)
} }
fn resume_task_immediately_cl(sched: ~Scheduler, fn resume_task_immediately_cl(sched: ~Scheduler,
cur: ~GreenTask, cur: ~GreenTask,
next: ~GreenTask) { next: ~GreenTask) -> (~Scheduler, ~GreenTask) {
sched.resume_task_immediately(cur, next).put() sched.resume_task_immediately(cur, next)
} }
/// Block a running task, context switch to the scheduler, then pass the /// Block a running task, context switch to the scheduler, then pass the
...@@ -741,15 +773,17 @@ pub fn switch_running_tasks_and_then(~self, ...@@ -741,15 +773,17 @@ pub fn switch_running_tasks_and_then(~self,
cur.put(); cur.put();
} }
fn switch_task(sched: ~Scheduler, cur: ~GreenTask, next: ~GreenTask) { fn switch_task(sched: ~Scheduler, cur: ~GreenTask,
sched.change_task_context(cur, next, |sched, last_task| { next: ~GreenTask) -> (~Scheduler, ~GreenTask) {
let mut cur = sched.change_task_context(cur, next, |sched, last_task| {
if last_task.is_sched() { if last_task.is_sched() {
assert!(sched.sched_task.is_none()); assert!(sched.sched_task.is_none());
sched.sched_task = Some(last_task); sched.sched_task = Some(last_task);
} else { } else {
sched.enqueue_task(last_task); sched.enqueue_task(last_task);
} }
}).put() });
(cur.sched.take_unwrap(), cur)
} }
// * Task Context Helpers // * Task Context Helpers
...@@ -769,7 +803,9 @@ pub fn terminate_current_task(mut ~self, cur: ~GreenTask) -> ! { ...@@ -769,7 +803,9 @@ pub fn terminate_current_task(mut ~self, cur: ~GreenTask) -> ! {
} }
pub fn run_task(~self, cur: ~GreenTask, next: ~GreenTask) { pub fn run_task(~self, cur: ~GreenTask, next: ~GreenTask) {
self.process_task(cur, next, Scheduler::switch_task); let (sched, task) =
self.process_task(cur, next, Scheduler::switch_task);
task.put_with_sched(sched);
} }
pub fn run_task_later(mut cur: ~GreenTask, next: ~GreenTask) { pub fn run_task_later(mut cur: ~GreenTask, next: ~GreenTask) {
...@@ -836,7 +872,8 @@ pub fn make_handle(&mut self) -> SchedHandle { ...@@ -836,7 +872,8 @@ pub fn make_handle(&mut self) -> SchedHandle {
// Supporting types // Supporting types
type SchedulingFn = extern "Rust" fn (~Scheduler, ~GreenTask, ~GreenTask); type SchedulingFn = fn (~Scheduler, ~GreenTask, ~GreenTask)
-> (~Scheduler, ~GreenTask);
pub enum SchedMessage { pub enum SchedMessage {
Wake, Wake,
......
...@@ -86,7 +86,7 @@ pub fn run(loop_: &Loop, node: Option<&str>, service: Option<&str>, ...@@ -86,7 +86,7 @@ pub fn run(loop_: &Loop, node: Option<&str>, service: Option<&str>,
req.defuse(); // uv callback now owns this request req.defuse(); // uv callback now owns this request
let mut cx = Ctx { slot: None, status: 0, addrinfo: None }; let mut cx = Ctx { slot: None, status: 0, addrinfo: None };
wait_until_woken_after(&mut cx.slot, || { wait_until_woken_after(&mut cx.slot, loop_, || {
req.set_data(&cx); req.set_data(&cx);
}); });
......
...@@ -304,7 +304,8 @@ fn execute(f: |*uvll::uv_fs_t, uvll::uv_fs_cb| -> c_int) ...@@ -304,7 +304,8 @@ fn execute(f: |*uvll::uv_fs_t, uvll::uv_fs_cb| -> c_int)
0 => { 0 => {
req.fired = true; req.fired = true;
let mut slot = None; let mut slot = None;
wait_until_woken_after(&mut slot, || { let loop_ = unsafe { uvll::get_loop_from_fs_req(req.req) };
wait_until_woken_after(&mut slot, &Loop::wrap(loop_), || {
unsafe { uvll::set_data_for_req(req.req, &slot) } unsafe { uvll::set_data_for_req(req.req, &slot) }
}); });
match req.get_result() { match req.get_result() {
......
...@@ -47,7 +47,7 @@ ...@@ -47,7 +47,7 @@
use std::cast; use std::cast;
use std::io; use std::io;
use std::io::IoError; use std::io::IoError;
use std::libc::c_int; use std::libc::{c_int, c_void};
use std::ptr::null; use std::ptr::null;
use std::ptr; use std::ptr;
use std::rt::local::Local; use std::rt::local::Local;
...@@ -95,6 +95,10 @@ ...@@ -95,6 +95,10 @@
pub trait UvHandle<T> { pub trait UvHandle<T> {
fn uv_handle(&self) -> *T; fn uv_handle(&self) -> *T;
fn uv_loop(&self) -> Loop {
Loop::wrap(unsafe { uvll::get_loop_for_uv_handle(self.uv_handle()) })
}
// FIXME(#8888) dummy self // FIXME(#8888) dummy self
fn alloc(_: Option<Self>, ty: uvll::uv_handle_type) -> *T { fn alloc(_: Option<Self>, ty: uvll::uv_handle_type) -> *T {
unsafe { unsafe {
...@@ -136,7 +140,7 @@ fn close(&mut self) { ...@@ -136,7 +140,7 @@ fn close(&mut self) {
uvll::uv_close(self.uv_handle() as *uvll::uv_handle_t, close_cb); uvll::uv_close(self.uv_handle() as *uvll::uv_handle_t, close_cb);
uvll::set_data_for_uv_handle(self.uv_handle(), ptr::null::<()>()); uvll::set_data_for_uv_handle(self.uv_handle(), ptr::null::<()>());
wait_until_woken_after(&mut slot, || { wait_until_woken_after(&mut slot, &self.uv_loop(), || {
uvll::set_data_for_uv_handle(self.uv_handle(), &slot); uvll::set_data_for_uv_handle(self.uv_handle(), &slot);
}) })
} }
...@@ -195,16 +199,20 @@ fn drop(&mut self) { ...@@ -195,16 +199,20 @@ fn drop(&mut self) {
} }
} }
fn wait_until_woken_after(slot: *mut Option<BlockedTask>, f: ||) { fn wait_until_woken_after(slot: *mut Option<BlockedTask>,
loop_: &Loop,
f: ||) {
let _f = ForbidUnwind::new("wait_until_woken_after"); let _f = ForbidUnwind::new("wait_until_woken_after");
unsafe { unsafe {
assert!((*slot).is_none()); assert!((*slot).is_none());
let task: ~Task = Local::take(); let task: ~Task = Local::take();
loop_.modify_blockers(1);
task.deschedule(1, |task| { task.deschedule(1, |task| {
*slot = Some(task); *slot = Some(task);
f(); f();
Ok(()) Ok(())
}); });
loop_.modify_blockers(-1);
} }
} }
...@@ -273,6 +281,7 @@ impl Loop { ...@@ -273,6 +281,7 @@ impl Loop {
pub fn new() -> Loop { pub fn new() -> Loop {
let handle = unsafe { uvll::loop_new() }; let handle = unsafe { uvll::loop_new() };
assert!(handle.is_not_null()); assert!(handle.is_not_null());
unsafe { uvll::set_data_for_uv_loop(handle, 0 as *c_void) }
Loop::wrap(handle) Loop::wrap(handle)
} }
...@@ -285,6 +294,19 @@ pub fn run(&mut self) { ...@@ -285,6 +294,19 @@ pub fn run(&mut self) {
pub fn close(&mut self) { pub fn close(&mut self) {
unsafe { uvll::uv_loop_delete(self.handle) }; unsafe { uvll::uv_loop_delete(self.handle) };
} }
// The 'data' field of the uv_loop_t is used to count the number of tasks
// that are currently blocked waiting for I/O to complete.
fn modify_blockers(&self, amt: uint) {
unsafe {
let cur = uvll::get_data_for_uv_loop(self.handle) as uint;
uvll::set_data_for_uv_loop(self.handle, (cur + amt) as *c_void)
}
}
fn get_blockers(&self) -> uint {
unsafe { uvll::get_data_for_uv_loop(self.handle) as uint }
}
} }
// FIXME: Need to define the error constants like EOF so they can be // FIXME: Need to define the error constants like EOF so they can be
......
...@@ -216,7 +216,7 @@ struct Ctx { status: c_int, task: Option<BlockedTask> } ...@@ -216,7 +216,7 @@ struct Ctx { status: c_int, task: Option<BlockedTask> }
0 => { 0 => {
req.defuse(); // uv callback now owns this request req.defuse(); // uv callback now owns this request
let mut cx = Ctx { status: 0, task: None }; let mut cx = Ctx { status: 0, task: None };
wait_until_woken_after(&mut cx.task, || { wait_until_woken_after(&mut cx.task, &io.loop_, || {
req.set_data(&cx); req.set_data(&cx);
}); });
match cx.status { match cx.status {
...@@ -498,6 +498,7 @@ struct Ctx { ...@@ -498,6 +498,7 @@ struct Ctx {
buf: Option<Buf>, buf: Option<Buf>,
result: Option<(ssize_t, Option<ip::SocketAddr>)>, result: Option<(ssize_t, Option<ip::SocketAddr>)>,
} }
let loop_ = self.uv_loop();
let m = self.fire_homing_missile(); let m = self.fire_homing_missile();
let _g = self.read_access.grant(m); let _g = self.read_access.grant(m);
...@@ -511,7 +512,7 @@ struct Ctx { ...@@ -511,7 +512,7 @@ struct Ctx {
result: None, result: None,
}; };
let handle = self.handle; let handle = self.handle;
wait_until_woken_after(&mut cx.task, || { wait_until_woken_after(&mut cx.task, &loop_, || {
unsafe { uvll::set_data_for_uv_handle(handle, &cx) } unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
}); });
match cx.result.take_unwrap() { match cx.result.take_unwrap() {
...@@ -571,6 +572,7 @@ fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> { ...@@ -571,6 +572,7 @@ fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> {
struct Ctx { task: Option<BlockedTask>, result: c_int } struct Ctx { task: Option<BlockedTask>, result: c_int }
let m = self.fire_homing_missile(); let m = self.fire_homing_missile();
let loop_ = self.uv_loop();
let _g = self.write_access.grant(m); let _g = self.write_access.grant(m);
let mut req = Request::new(uvll::UV_UDP_SEND); let mut req = Request::new(uvll::UV_UDP_SEND);
...@@ -586,7 +588,7 @@ struct Ctx { task: Option<BlockedTask>, result: c_int } ...@@ -586,7 +588,7 @@ struct Ctx { task: Option<BlockedTask>, result: c_int }
0 => { 0 => {
req.defuse(); // uv callback now owns this request req.defuse(); // uv callback now owns this request
let mut cx = Ctx { task: None, result: 0 }; let mut cx = Ctx { task: None, result: 0 };
wait_until_woken_after(&mut cx.task, || { wait_until_woken_after(&mut cx.task, &loop_, || {
req.set_data(&cx); req.set_data(&cx);
}); });
match cx.result { match cx.result {
......
...@@ -92,7 +92,7 @@ struct Ctx { task: Option<BlockedTask>, result: libc::c_int, } ...@@ -92,7 +92,7 @@ struct Ctx { task: Option<BlockedTask>, result: libc::c_int, }
let mut req = Request::new(uvll::UV_CONNECT); let mut req = Request::new(uvll::UV_CONNECT);
let pipe = PipeWatcher::new(io, false); let pipe = PipeWatcher::new(io, false);
wait_until_woken_after(&mut cx.task, || { wait_until_woken_after(&mut cx.task, &io.loop_, || {
unsafe { unsafe {
uvll::uv_pipe_connect(req.handle, uvll::uv_pipe_connect(req.handle,
pipe.handle(), pipe.handle(),
......
...@@ -211,7 +211,7 @@ fn wait(&mut self) -> process::ProcessExit { ...@@ -211,7 +211,7 @@ fn wait(&mut self) -> process::ProcessExit {
// If there's no exit code previously listed, then the // If there's no exit code previously listed, then the
// process's exit callback has yet to be invoked. We just // process's exit callback has yet to be invoked. We just
// need to deschedule ourselves and wait to be reawoken. // need to deschedule ourselves and wait to be reawoken.
wait_until_woken_after(&mut self.to_wake, || {}); wait_until_woken_after(&mut self.to_wake, &self.uv_loop(), || {});
assert!(self.exit_status.is_some()); assert!(self.exit_status.is_some());
} }
} }
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
use std::ptr; use std::ptr;
use std::rt::task::BlockedTask; use std::rt::task::BlockedTask;
use Loop;
use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after, use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after,
ForbidUnwind, wakeup}; ForbidUnwind, wakeup};
use uvll; use uvll;
...@@ -87,7 +88,8 @@ pub fn read(&mut self, buf: &mut [u8]) -> Result<uint, UvError> { ...@@ -87,7 +88,8 @@ pub fn read(&mut self, buf: &mut [u8]) -> Result<uint, UvError> {
uvll::uv_read_start(self.handle, alloc_cb, read_cb) uvll::uv_read_start(self.handle, alloc_cb, read_cb)
} { } {
0 => { 0 => {
wait_until_woken_after(&mut rcx.task, || {}); let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) };
wait_until_woken_after(&mut rcx.task, &Loop::wrap(loop_), || {});
match rcx.result { match rcx.result {
n if n < 0 => Err(UvError(n as c_int)), n if n < 0 => Err(UvError(n as c_int)),
n => Ok(n as uint), n => Ok(n as uint),
...@@ -121,7 +123,8 @@ pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> { ...@@ -121,7 +123,8 @@ pub fn write(&mut self, buf: &[u8]) -> Result<(), UvError> {
let mut wcx = WriteContext { result: 0, task: None, }; let mut wcx = WriteContext { result: 0, task: None, };
req.defuse(); // uv callback now owns this request req.defuse(); // uv callback now owns this request
wait_until_woken_after(&mut wcx.task, || { let loop_ = unsafe { uvll::get_loop_for_uv_handle(self.handle) };
wait_until_woken_after(&mut wcx.task, &Loop::wrap(loop_), || {
req.set_data(&wcx); req.set_data(&wcx);
}); });
self.last_write_req = Some(Request::wrap(req.handle)); self.last_write_req = Some(Request::wrap(req.handle));
......
...@@ -9,13 +9,12 @@ ...@@ -9,13 +9,12 @@
// except according to those terms. // except according to those terms.
use std::libc::c_int; use std::libc::c_int;
use std::mem::replace; use std::mem;
use std::rt::local::Local;
use std::rt::rtio::RtioTimer; use std::rt::rtio::RtioTimer;
use std::rt::task::{BlockedTask, Task}; use std::rt::task::BlockedTask;
use homing::{HomeHandle, HomingIO}; use homing::{HomeHandle, HomingIO};
use super::{UvHandle, ForbidUnwind, ForbidSwitch}; use super::{UvHandle, ForbidUnwind, ForbidSwitch, wait_until_woken_after};
use uvio::UvIoFactory; use uvio::UvIoFactory;
use uvll; use uvll;
...@@ -23,11 +22,12 @@ pub struct TimerWatcher { ...@@ -23,11 +22,12 @@ pub struct TimerWatcher {
handle: *uvll::uv_timer_t, handle: *uvll::uv_timer_t,
home: HomeHandle, home: HomeHandle,
action: Option<NextAction>, action: Option<NextAction>,
blocker: Option<BlockedTask>,
id: uint, // see comments in timer_cb id: uint, // see comments in timer_cb
} }
pub enum NextAction { pub enum NextAction {
WakeTask(BlockedTask), WakeTask,
SendOnce(Chan<()>), SendOnce(Chan<()>),
SendMany(Chan<()>, uint), SendMany(Chan<()>, uint),
} }
...@@ -41,6 +41,7 @@ pub fn new(io: &mut UvIoFactory) -> ~TimerWatcher { ...@@ -41,6 +41,7 @@ pub fn new(io: &mut UvIoFactory) -> ~TimerWatcher {
let me = ~TimerWatcher { let me = ~TimerWatcher {
handle: handle, handle: handle,
action: None, action: None,
blocker: None,
home: io.make_handle(), home: io.make_handle(),
id: 0, id: 0,
}; };
...@@ -76,7 +77,7 @@ fn sleep(&mut self, msecs: u64) { ...@@ -76,7 +77,7 @@ fn sleep(&mut self, msecs: u64) {
let missile = self.fire_homing_missile(); let missile = self.fire_homing_missile();
self.id += 1; self.id += 1;
self.stop(); self.stop();
let _missile = match replace(&mut self.action, None) { let _missile = match mem::replace(&mut self.action, None) {
None => missile, // no need to do a homing dance None => missile, // no need to do a homing dance
Some(action) => { Some(action) => {
drop(missile); // un-home ourself drop(missile); // un-home ourself
...@@ -89,11 +90,9 @@ fn sleep(&mut self, msecs: u64) { ...@@ -89,11 +90,9 @@ fn sleep(&mut self, msecs: u64) {
// started, then we need to call stop on the timer. // started, then we need to call stop on the timer.
let _f = ForbidUnwind::new("timer"); let _f = ForbidUnwind::new("timer");
let task: ~Task = Local::take(); self.action = Some(WakeTask);
task.deschedule(1, |task| { wait_until_woken_after(&mut self.blocker, &self.uv_loop(), || {
self.action = Some(WakeTask(task));
self.start(msecs, 0); self.start(msecs, 0);
Ok(())
}); });
self.stop(); self.stop();
} }
...@@ -108,7 +107,7 @@ fn oneshot(&mut self, msecs: u64) -> Port<()> { ...@@ -108,7 +107,7 @@ fn oneshot(&mut self, msecs: u64) -> Port<()> {
self.id += 1; self.id += 1;
self.stop(); self.stop();
self.start(msecs, 0); self.start(msecs, 0);
replace(&mut self.action, Some(SendOnce(chan))) mem::replace(&mut self.action, Some(SendOnce(chan)))
}; };
return port; return port;
...@@ -124,7 +123,7 @@ fn period(&mut self, msecs: u64) -> Port<()> { ...@@ -124,7 +123,7 @@ fn period(&mut self, msecs: u64) -> Port<()> {
self.id += 1; self.id += 1;
self.stop(); self.stop();
self.start(msecs, msecs); self.start(msecs, msecs);
replace(&mut self.action, Some(SendMany(chan, self.id))) mem::replace(&mut self.action, Some(SendMany(chan, self.id)))
}; };
return port; return port;
...@@ -137,7 +136,8 @@ fn period(&mut self, msecs: u64) -> Port<()> { ...@@ -137,7 +136,8 @@ fn period(&mut self, msecs: u64) -> Port<()> {
let timer: &mut TimerWatcher = unsafe { UvHandle::from_uv_handle(&handle) }; let timer: &mut TimerWatcher = unsafe { UvHandle::from_uv_handle(&handle) };
match timer.action.take_unwrap() { match timer.action.take_unwrap() {
WakeTask(task) => { WakeTask => {
let task = timer.blocker.take_unwrap();
let _ = task.wake().map(|t| t.reawaken()); let _ = task.wake().map(|t| t.reawaken());
} }
SendOnce(chan) => { let _ = chan.try_send(()); } SendOnce(chan) => { let _ = chan.try_send(()); }
......
...@@ -99,6 +99,10 @@ fn io<'a>(&'a mut self) -> Option<&'a mut rtio::IoFactory> { ...@@ -99,6 +99,10 @@ fn io<'a>(&'a mut self) -> Option<&'a mut rtio::IoFactory> {
let factory = &mut self.uvio as &mut rtio::IoFactory; let factory = &mut self.uvio as &mut rtio::IoFactory;
Some(factory) Some(factory)
} }
fn has_active_io(&self) -> bool {
self.uvio.loop_.get_blockers() > 0
}
} }
#[cfg(not(test))] #[cfg(not(test))]
......
...@@ -41,6 +41,7 @@ pub trait EventLoop { ...@@ -41,6 +41,7 @@ pub trait EventLoop {
/// The asynchronous I/O services. Not all event loops may provide one. /// The asynchronous I/O services. Not all event loops may provide one.
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory>; fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory>;
fn has_active_io(&self) -> bool;
} }
pub trait RemoteCallback { pub trait RemoteCallback {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册