提交 af2e0399 编写于 作者: T toddaaro

Enabled workstealing in the scheduler. Previously we had one global work queue...

Enabled workstealing in the scheduler. Previously we had one global work queue shared by each scheduler. Now there is a separate work queue for each scheduler, and work is "stolen" from other queues when it is exhausted locally.
上级 a0080f4e
......@@ -225,9 +225,10 @@ fn optimistic_check(&mut self) -> bool {
fn optimistic_check(&mut self) -> bool {
// The optimistic check is never necessary for correctness. For testing
// purposes, making it randomly return false simulates a racing sender.
use rand::{Rand, rng};
let mut rng = rng();
let actually_check = Rand::rand(&mut rng);
use rand::{Rand};
let actually_check = do Local::borrow::<Scheduler, bool> |sched| {
Rand::rand(&mut sched.rng)
};
if actually_check {
unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
} else {
......
......@@ -63,8 +63,7 @@
use cell::Cell;
use clone::Clone;
use container::Container;
use iter::Times;
use iterator::{Iterator, IteratorUtil};
use iterator::{Iterator, IteratorUtil, range};
use option::{Some, None};
use ptr::RawPtr;
use rt::local::Local;
......@@ -247,11 +246,16 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
let main = Cell::new(main);
// The shared list of sleeping schedulers. Schedulers wake each other
// occassionally to do new work.
// The shared list of sleeping schedulers.
let sleepers = SleeperList::new();
// The shared work queue. Temporary until work stealing is implemented.
let work_queue = WorkQueue::new();
// Create a work queue for each scheduler, ntimes. Create an extra
// for the main thread if that flag is set. We won't steal from it.
let mut work_queues = ~[];
for _ in range(0u, nscheds) {
let work_queue: WorkQueue<~Task> = WorkQueue::new();
work_queues.push(work_queue);
}
// The schedulers.
let mut scheds = ~[];
......@@ -259,12 +263,15 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
// sent the Shutdown message to terminate the schedulers.
let mut handles = ~[];
do nscheds.times {
for i in range(0u, nscheds) {
rtdebug!("inserting a regular scheduler");
// Every scheduler is driven by an I/O event loop.
let loop_ = ~UvEventLoop::new();
let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone());
let mut sched = ~Scheduler::new(loop_,
work_queues[i].clone(),
work_queues.clone(),
sleepers.clone());
let handle = sched.make_handle();
scheds.push(sched);
......@@ -280,9 +287,14 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
let friend_handle = friend_sched.make_handle();
scheds.push(friend_sched);
// This scheduler needs a queue that isn't part of the stealee
// set.
let work_queue = WorkQueue::new();
let main_loop = ~UvEventLoop::new();
let mut main_sched = ~Scheduler::new_special(main_loop,
work_queue.clone(),
work_queue,
work_queues.clone(),
sleepers.clone(),
false,
Some(friend_handle));
......@@ -371,7 +383,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
let mut main_task = ~Task::new_root_homed(&mut main_sched.stack_pool, None,
home, main.take());
main_task.death.on_exit = Some(on_exit.take());
rtdebug!("boostrapping main_task");
rtdebug!("bootstrapping main_task");
main_sched.bootstrap(main_task);
}
......
......@@ -13,7 +13,6 @@
use cast::{transmute, transmute_mut_region, transmute_mut_unsafe};
use clone::Clone;
use unstable::raw;
use super::sleeper_list::SleeperList;
use super::work_queue::WorkQueue;
use super::stack::{StackPool};
......@@ -28,6 +27,9 @@
use rt::metrics::SchedMetrics;
use borrow::{to_uint};
use cell::Cell;
use rand::{XorShiftRng, RngUtil};
use iterator::{range};
use vec::{OwnedVector};
/// The Scheduler is responsible for coordinating execution of Coroutines
/// on a single thread. When the scheduler is running it is owned by
......@@ -37,9 +39,11 @@
/// XXX: This creates too many callbacks to run_sched_once, resulting
/// in too much allocation and too many events.
pub struct Scheduler {
/// A queue of available work. Under a work-stealing policy there
/// is one per Scheduler.
work_queue: WorkQueue<~Task>,
/// There are N work queues, one per scheduler.
priv work_queue: WorkQueue<~Task>,
/// Work queues for the other schedulers. These are created by
/// cloning the core work queues.
work_queues: ~[WorkQueue<~Task>],
/// The queue of incoming messages from other schedulers.
/// These are enqueued by SchedHandles after which a remote callback
/// is triggered to handle the message.
......@@ -70,7 +74,10 @@ pub struct Scheduler {
run_anything: bool,
/// If the scheduler shouldn't run some tasks, a friend to send
/// them to.
friend_handle: Option<SchedHandle>
friend_handle: Option<SchedHandle>,
/// A fast XorShift rng for scheduler use
rng: XorShiftRng
}
pub struct SchedHandle {
......@@ -97,10 +104,13 @@ pub fn sched_id(&self) -> uint { to_uint(self) }
pub fn new(event_loop: ~EventLoopObject,
work_queue: WorkQueue<~Task>,
work_queues: ~[WorkQueue<~Task>],
sleeper_list: SleeperList)
-> Scheduler {
Scheduler::new_special(event_loop, work_queue, sleeper_list, true, None)
Scheduler::new_special(event_loop, work_queue,
work_queues,
sleeper_list, true, None)
}
......@@ -108,6 +118,7 @@ pub fn new(event_loop: ~EventLoopObject,
// task field is None.
pub fn new_special(event_loop: ~EventLoopObject,
work_queue: WorkQueue<~Task>,
work_queues: ~[WorkQueue<~Task>],
sleeper_list: SleeperList,
run_anything: bool,
friend: Option<SchedHandle>)
......@@ -120,12 +131,14 @@ pub fn new_special(event_loop: ~EventLoopObject,
no_sleep: false,
event_loop: event_loop,
work_queue: work_queue,
work_queues: work_queues,
stack_pool: StackPool::new(),
sched_task: None,
cleanup_job: None,
metrics: SchedMetrics::new(),
run_anything: run_anything,
friend_handle: friend
friend_handle: friend,
rng: XorShiftRng::new()
}
}
......@@ -248,7 +261,7 @@ fn run_sched_once() {
// Second activity is to try resuming a task from the queue.
let result = sched.resume_task_from_queue();
let result = sched.do_work();
let mut sched = match result {
Some(sched) => {
// Failed to dequeue a task, so we return.
......@@ -415,47 +428,98 @@ fn send_to_friend(&mut self, task: ~Task) {
}
}
// Resume a task from the queue - but also take into account that
// it might not belong here.
// Workstealing: In this iteration of the runtime each scheduler
// thread has a distinct work queue. When no work is available
// locally, make a few attempts to steal work from the queues of
// other scheduler threads. If a few steals fail we end up in the
// old "no work" path which is fine.
// First step in the process is to find a task. This function does
// that by first checking the local queue, and if there is no work
// there, trying to steal from the remote work queues.
fn find_work(&mut self) -> Option<~Task> {
rtdebug!("scheduler looking for work");
match self.work_queue.pop() {
Some(task) => {
rtdebug!("found a task locally");
return Some(task)
}
None => {
// Our naive stealing, try kinda hard.
rtdebug!("scheduler trying to steal");
let _len = self.work_queues.len();
return self.try_steals(2);
}
}
}
// With no backoff try stealing n times from the queues the
// scheduler knows about. This naive implementation can steal from
// our own queue or from other special schedulers.
fn try_steals(&mut self, n: uint) -> Option<~Task> {
for _ in range(0, n) {
let index = self.rng.gen_uint_range(0, self.work_queues.len());
let work_queues = &mut self.work_queues;
match work_queues[index].steal() {
Some(task) => {
rtdebug!("found task by stealing"); return Some(task)
}
None => ()
}
};
rtdebug!("giving up on stealing");
return None;
}
// If we perform a scheduler action we give away the scheduler ~
// pointer, if it is still available we return it.
// Given a task, execute it correctly.
fn process_task(~self, task: ~Task) -> Option<~Scheduler> {
let mut this = self;
let mut task = task;
fn resume_task_from_queue(~self) -> Option<~Scheduler> {
rtdebug!("processing a task");
let home = task.take_unwrap_home();
match home {
Sched(home_handle) => {
if home_handle.sched_id != this.sched_id() {
rtdebug!("sending task home");
task.give_home(Sched(home_handle));
Scheduler::send_task_home(task);
return Some(this);
} else {
rtdebug!("running task here");
task.give_home(Sched(home_handle));
this.resume_task_immediately(task);
return None;
}
}
AnySched if this.run_anything => {
rtdebug!("running anysched task here");
task.give_home(AnySched);
this.resume_task_immediately(task);
return None;
}
AnySched => {
rtdebug!("sending task to friend");
task.give_home(AnySched);
this.send_to_friend(task);
return Some(this);
}
}
}
// Bundle the helpers together.
fn do_work(~self) -> Option<~Scheduler> {
let mut this = self;
match this.work_queue.pop() {
rtdebug!("scheduler calling do work");
match this.find_work() {
Some(task) => {
let mut task = task;
let home = task.take_unwrap_home();
match home {
Sched(home_handle) => {
if home_handle.sched_id != this.sched_id() {
task.give_home(Sched(home_handle));
Scheduler::send_task_home(task);
return Some(this);
} else {
this.event_loop.callback(Scheduler::run_sched_once);
task.give_home(Sched(home_handle));
this.resume_task_immediately(task);
return None;
}
}
AnySched if this.run_anything => {
this.event_loop.callback(Scheduler::run_sched_once);
task.give_home(AnySched);
this.resume_task_immediately(task);
return None;
}
AnySched => {
task.give_home(AnySched);
this.send_to_friend(task);
return Some(this);
}
}
rtdebug!("found some work! processing the task");
return this.process_task(task);
}
None => {
rtdebug!("no work was found, returning the scheduler struct");
return Some(this);
}
}
......@@ -711,7 +775,6 @@ pub fn run_cleanup_job(&mut self) {
GiveTask(task, f) => f.to_fn()(self, task)
}
}
}
// The cases for the below function.
......@@ -745,6 +808,8 @@ fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { unsafe { transmute(self) } }
#[cfg(test)]
mod test {
extern mod extra;
use prelude::*;
use rt::test::*;
use unstable::run_in_bare_thread;
......@@ -862,12 +927,15 @@ fn test_schedule_home_states() {
do run_in_bare_thread {
let sleepers = SleeperList::new();
let work_queue = WorkQueue::new();
let normal_queue = WorkQueue::new();
let special_queue = WorkQueue::new();
let queues = ~[normal_queue.clone(), special_queue.clone()];
// Our normal scheduler
let mut normal_sched = ~Scheduler::new(
~UvEventLoop::new(),
work_queue.clone(),
normal_queue,
queues.clone(),
sleepers.clone());
let normal_handle = Cell::new(normal_sched.make_handle());
......@@ -877,7 +945,8 @@ fn test_schedule_home_states() {
// Our special scheduler
let mut special_sched = ~Scheduler::new_special(
~UvEventLoop::new(),
work_queue.clone(),
special_queue.clone(),
queues.clone(),
sleepers.clone(),
false,
Some(friend_handle));
......
......@@ -182,6 +182,7 @@ fn select_a_lot() {
fn select_stream() {
use util;
use comm::GenericChan;
use iter::Times;
// Sends 10 buffered packets, and uses select to retrieve them all.
// Puts the port in a different spot in the vector each time.
......@@ -265,6 +266,7 @@ fn select_racing_senders() {
fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
use rt::test::spawntask_random;
use iter::Times;
do run_in_newsched_task {
// A bit of stress, since ordinarily this is just smoke and mirrors.
......
......@@ -15,8 +15,8 @@
use clone::Clone;
use container::Container;
use iterator::{Iterator, range};
use vec::{OwnedVector, MutableVector};
use super::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
use vec::{OwnedVector, MutableVector, ImmutableVector};
use rt::sched::Scheduler;
use unstable::run_in_bare_thread;
use rt::thread::Thread;
......@@ -29,8 +29,12 @@
pub fn new_test_uv_sched() -> Scheduler {
let queue = WorkQueue::new();
let queues = ~[queue.clone()];
let mut sched = Scheduler::new(~UvEventLoop::new(),
WorkQueue::new(),
queue,
queues,
SleeperList::new());
// Don't wait for the Shutdown message
......@@ -164,15 +168,21 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
};
let sleepers = SleeperList::new();
let work_queue = WorkQueue::new();
let mut handles = ~[];
let mut scheds = ~[];
let mut work_queues = ~[];
for _ in range(0u, nthreads) {
let work_queue = WorkQueue::new();
work_queues.push(work_queue);
}
for i in range(0u, nthreads) {
let loop_ = ~UvEventLoop::new();
let mut sched = ~Scheduler::new(loop_,
work_queue.clone(),
work_queues[i].clone(),
work_queues.clone(),
sleepers.clone());
let handle = sched.make_handle();
......
......@@ -98,6 +98,7 @@
use rt::sched::Scheduler;
use rt::uv::uvio::UvEventLoop;
use rt::thread::Thread;
use rt::work_queue::WorkQueue;
#[cfg(test)] use task::default_task_opts;
#[cfg(test)] use comm;
......@@ -722,10 +723,16 @@ fn spawn_raw_newsched(mut opts: TaskOpts, f: ~fn()) {
let sched = Local::unsafe_borrow::<Scheduler>();
let sched_handle = (*sched).make_handle();
// Since this is a 1:1 scheduler we create a queue not in
// the stealee set. The run_anything flag is set false
// which will disable stealing.
let work_queue = WorkQueue::new();
// Create a new scheduler to hold the new task
let new_loop = ~UvEventLoop::new();
let mut new_sched = ~Scheduler::new_special(new_loop,
(*sched).work_queue.clone(),
work_queue,
(*sched).work_queues.clone(),
(*sched).sleeper_list.clone(),
false,
Some(sched_handle));
......
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
extern mod extra;
use std::task::spawn;
use std::os;
use std::uint;
use std::rt::test::spawntask_later;
use std::cell::Cell;
// This is a simple bench that creates M pairs of of tasks. These
// tasks ping-pong back and forth over a pair of streams. This is a
// cannonical message-passing benchmark as it heavily strains message
// passing and almost nothing else.
fn ping_pong_bench(n: uint, m: uint) {
// Create pairs of tasks that pingpong back and forth.
fn run_pair(n: uint) {
// Create a stream A->B
let (pa,ca) = stream::<()>();
// Create a stream B->A
let (pb,cb) = stream::<()>();
let pa = Cell::new(pa);
let ca = Cell::new(ca);
let pb = Cell::new(pb);
let cb = Cell::new(cb);
do spawntask_later() || {
let chan = ca.take();
let port = pb.take();
do n.times {
chan.send(());
port.recv();
}
}
do spawntask_later() || {
let chan = cb.take();
let port = pa.take();
do n.times {
port.recv();
chan.send(());
}
}
}
do m.times {
run_pair(n)
}
}
fn main() {
let args = os::args();
let n = if args.len() == 3 {
uint::from_str(args[1]).unwrap()
} else {
10000
};
let m = if args.len() == 3 {
uint::from_str(args[2]).unwrap()
} else {
4
};
ping_pong_bench(n, m);
}
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
extern mod extra;
use std::task::spawn;
use std::os;
use std::uint;
use std::rt::test::spawntask_later;
use std::cell::Cell;
use std::comm::*;
// A simple implementation of parfib. One subtree is found in a new
// task and communicated over a oneshot pipe, the other is found
// locally. There is no sequential-mode threshold.
fn parfib(n: uint) -> uint {
if(n == 0 || n == 1) {
return 1;
}
let (port,chan) = oneshot::<uint>();
let chan = Cell::new(chan);
do spawntask_later {
chan.take().send(parfib(n-1));
};
let m2 = parfib(n-2);
return (port.recv() + m2);
}
fn main() {
let args = os::args();
let n = if args.len() == 2 {
uint::from_str(args[1]).unwrap()
} else {
10
};
parfib(n);
}
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
extern mod extra;
use std::task::spawn;
use std::os;
use std::uint;
// Very simple spawn rate test. Spawn N tasks that do nothing and
// return.
fn main() {
let args = os::args();
let n = if args.len() == 2 {
uint::from_str(args[1]).unwrap()
} else {
100000
};
do n.times {
do spawn || {};
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册