diff --git a/src/libgreen/basic.rs b/src/libgreen/basic.rs deleted file mode 100644 index aa933f182e54511654d434499bf1daf474f33b65..0000000000000000000000000000000000000000 --- a/src/libgreen/basic.rs +++ /dev/null @@ -1,259 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! This is a basic event loop implementation not meant for any "real purposes" -//! other than testing the scheduler and proving that it's possible to have a -//! pluggable event loop. -//! -//! This implementation is also used as the fallback implementation of an event -//! loop if no other one is provided (and M:N scheduling is desired). -use self::Message::*; - -use alloc::arc::Arc; -use std::sync::atomic; -use std::mem; -use std::rt::rtio::{EventLoop, RemoteCallback}; -use std::rt::rtio::{PausableIdleCallback, Callback}; -use std::rt::exclusive::Exclusive; - -/// This is the only exported function from this module. -pub fn event_loop() -> Box { - box BasicLoop::new() as Box -} - -struct BasicLoop { - work: Vec, // pending work - remotes: Vec<(uint, Box)>, - next_remote: uint, - messages: Arc>>, - idle: Option>, - idle_active: Option>, -} - -enum Message { RunRemote(uint), RemoveRemote(uint) } - -impl BasicLoop { - fn new() -> BasicLoop { - BasicLoop { - work: vec![], - idle: None, - idle_active: None, - next_remote: 0, - remotes: vec![], - messages: Arc::new(Exclusive::new(Vec::new())), - } - } - - /// Process everything in the work queue (continually) - fn work(&mut self) { - while self.work.len() > 0 { - for work in mem::replace(&mut self.work, vec![]).into_iter() { - work(); - } - } - } - - fn remote_work(&mut self) { - let messages = unsafe { - mem::replace(&mut *self.messages.lock(), Vec::new()) - }; - for message in messages.into_iter() { - self.message(message); - } - } - - fn message(&mut self, message: Message) { - match message { - RunRemote(i) => { - match self.remotes.iter_mut().find(|& &(id, _)| id == i) { - Some(&(_, ref mut f)) => f.call(), - None => panic!("bad remote: {}", i), - } - } - RemoveRemote(i) => { - match self.remotes.iter().position(|&(id, _)| id == i) { - Some(i) => { self.remotes.remove(i).unwrap(); } - None => panic!("bad remote: {}", i), - } - } - } - } - - /// Run the idle callback if one is registered - fn idle(&mut self) { - match self.idle { - Some(ref mut idle) => { - if self.idle_active.as_ref().unwrap().load(atomic::SeqCst) { - idle.call(); - } - } - None => {} - } - } - - fn has_idle(&self) -> bool { - self.idle.is_some() && self.idle_active.as_ref().unwrap().load(atomic::SeqCst) - } -} - -impl EventLoop for BasicLoop { - fn run(&mut self) { - // Not exactly efficient, but it gets the job done. - while self.remotes.len() > 0 || self.work.len() > 0 || self.has_idle() { - - self.work(); - self.remote_work(); - - if self.has_idle() { - self.idle(); - continue - } - - unsafe { - let messages = self.messages.lock(); - // We block here if we have no messages to process and we may - // receive a message at a later date - if self.remotes.len() > 0 && messages.len() == 0 && - self.work.len() == 0 { - messages.wait() - } - } - } - } - - fn callback(&mut self, f: proc():Send) { - self.work.push(f); - } - - // FIXME: Seems like a really weird requirement to have an event loop provide. - fn pausable_idle_callback(&mut self, cb: Box) - -> Box { - rtassert!(self.idle.is_none()); - self.idle = Some(cb); - let a = Arc::new(atomic::AtomicBool::new(true)); - self.idle_active = Some(a.clone()); - box BasicPausable { active: a } as Box - } - - fn remote_callback(&mut self, f: Box) - -> Box { - let id = self.next_remote; - self.next_remote += 1; - self.remotes.push((id, f)); - box BasicRemote::new(self.messages.clone(), id) as - Box - } - - fn has_active_io(&self) -> bool { false } -} - -struct BasicRemote { - queue: Arc>>, - id: uint, -} - -impl BasicRemote { - fn new(queue: Arc>>, id: uint) -> BasicRemote { - BasicRemote { queue: queue, id: id } - } -} - -impl RemoteCallback for BasicRemote { - fn fire(&mut self) { - let mut queue = unsafe { self.queue.lock() }; - queue.push(RunRemote(self.id)); - queue.signal(); - } -} - -impl Drop for BasicRemote { - fn drop(&mut self) { - let mut queue = unsafe { self.queue.lock() }; - queue.push(RemoveRemote(self.id)); - queue.signal(); - } -} - -struct BasicPausable { - active: Arc, -} - -impl PausableIdleCallback for BasicPausable { - fn pause(&mut self) { - self.active.store(false, atomic::SeqCst); - } - fn resume(&mut self) { - self.active.store(true, atomic::SeqCst); - } -} - -impl Drop for BasicPausable { - fn drop(&mut self) { - self.active.store(false, atomic::SeqCst); - } -} - -#[cfg(test)] -mod test { - use std::rt::task::TaskOpts; - - use basic; - use PoolConfig; - use SchedPool; - - fn pool() -> SchedPool { - SchedPool::new(PoolConfig { - threads: 1, - event_loop_factory: basic::event_loop, - }) - } - - fn run(f: proc():Send) { - let mut pool = pool(); - pool.spawn(TaskOpts::new(), f); - pool.shutdown(); - } - - #[test] - fn smoke() { - run(proc() {}); - } - - #[test] - fn some_channels() { - run(proc() { - let (tx, rx) = channel(); - spawn(proc() { - tx.send(()); - }); - rx.recv(); - }); - } - - #[test] - fn multi_thread() { - let mut pool = SchedPool::new(PoolConfig { - threads: 2, - event_loop_factory: basic::event_loop, - }); - - for _ in range(0u, 20) { - pool.spawn(TaskOpts::new(), proc() { - let (tx, rx) = channel(); - spawn(proc() { - tx.send(()); - }); - rx.recv(); - }); - } - - pool.shutdown(); - } -} diff --git a/src/libgreen/context.rs b/src/libgreen/context.rs deleted file mode 100644 index 2d3e85cc833f3460a4ebb4b5f30332ef4269dded..0000000000000000000000000000000000000000 --- a/src/libgreen/context.rs +++ /dev/null @@ -1,325 +0,0 @@ -// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use stack::Stack; -use std::uint; -use std::mem::transmute; -use std::rt::stack; -use std::raw; -#[cfg(target_arch = "x86_64")] -use std::simd; -use libc; - -// FIXME #7761: Registers is boxed so that it is 16-byte aligned, for storing -// SSE regs. It would be marginally better not to do this. In C++ we -// use an attribute on a struct. -// FIXME #7761: It would be nice to define regs as `Box>` -// since the registers are sometimes empty, but the discriminant would -// then misalign the regs again. -pub struct Context { - /// Hold the registers while the task or scheduler is suspended - regs: Box, - /// Lower bound and upper bound for the stack - stack_bounds: Option<(uint, uint)>, -} - -pub type InitFn = extern "C" fn(uint, *mut (), *mut ()) -> !; - -impl Context { - pub fn empty() -> Context { - Context { - regs: new_regs(), - stack_bounds: None, - } - } - - /// Create a new context that will resume execution by running proc() - /// - /// The `init` function will be run with `arg` and the `start` procedure - /// split up into code and env pointers. It is required that the `init` - /// function never return. - /// - /// FIXME: this is basically an awful the interface. The main reason for - /// this is to reduce the number of allocations made when a green - /// task is spawned as much as possible - pub fn new(init: InitFn, arg: uint, start: proc():Send, - stack: &mut Stack) -> Context { - - let sp: *const uint = stack.end(); - let sp: *mut uint = sp as *mut uint; - // Save and then immediately load the current context, - // which we will then modify to call the given function when restored - let mut regs = new_regs(); - - initialize_call_frame(&mut *regs, - init, - arg, - unsafe { transmute(start) }, - sp); - - // Scheduler tasks don't have a stack in the "we allocated it" sense, - // but rather they run on pthreads stacks. We have complete control over - // them in terms of the code running on them (and hopefully they don't - // overflow). Additionally, their coroutine stacks are listed as being - // zero-length, so that's how we detect what's what here. - let stack_base: *const uint = stack.start(); - let bounds = if sp as libc::uintptr_t == stack_base as libc::uintptr_t { - None - } else { - Some((stack_base as uint, sp as uint)) - }; - return Context { - regs: regs, - stack_bounds: bounds, - } - } - - /* Switch contexts - - Suspend the current execution context and resume another by - saving the registers values of the executing thread to a Context - then loading the registers from a previously saved Context. - */ - pub fn swap(out_context: &mut Context, in_context: &Context) { - rtdebug!("swapping contexts"); - let out_regs: &mut Registers = match out_context { - &Context { regs: box ref mut r, .. } => r - }; - let in_regs: &Registers = match in_context { - &Context { regs: box ref r, .. } => r - }; - - rtdebug!("noting the stack limit and doing raw swap"); - - unsafe { - // Right before we switch to the new context, set the new context's - // stack limit in the OS-specified TLS slot. This also means that - // we cannot call any more rust functions after record_stack_bounds - // returns because they would all likely panic due to the limit being - // invalid for the current task. Lucky for us `rust_swap_registers` - // is a C function so we don't have to worry about that! - match in_context.stack_bounds { - Some((lo, hi)) => stack::record_rust_managed_stack_bounds(lo, hi), - // If we're going back to one of the original contexts or - // something that's possibly not a "normal task", then reset - // the stack limit to 0 to make morestack never panic - None => stack::record_rust_managed_stack_bounds(0, uint::MAX), - } - rust_swap_registers(out_regs, in_regs); - } - } -} - -#[link(name = "context_switch", kind = "static")] -extern { - fn rust_swap_registers(out_regs: *mut Registers, in_regs: *const Registers); -} - -// Register contexts used in various architectures -// -// These structures all represent a context of one task throughout its -// execution. Each struct is a representation of the architecture's register -// set. When swapping between tasks, these register sets are used to save off -// the current registers into one struct, and load them all from another. -// -// Note that this is only used for context switching, which means that some of -// the registers may go unused. For example, for architectures with -// callee/caller saved registers, the context will only reflect the callee-saved -// registers. This is because the caller saved registers are already stored -// elsewhere on the stack (if it was necessary anyway). -// -// Additionally, there may be fields on various architectures which are unused -// entirely because they only reflect what is theoretically possible for a -// "complete register set" to show, but user-space cannot alter these registers. -// An example of this would be the segment selectors for x86. -// -// These structures/functions are roughly in-sync with the source files inside -// of src/rt/arch/$arch. The only currently used function from those folders is -// the `rust_swap_registers` function, but that's only because for now segmented -// stacks are disabled. - -#[cfg(target_arch = "x86")] -#[repr(C)] -struct Registers { - eax: u32, ebx: u32, ecx: u32, edx: u32, - ebp: u32, esi: u32, edi: u32, esp: u32, - cs: u16, ds: u16, ss: u16, es: u16, fs: u16, gs: u16, - eflags: u32, eip: u32 -} - -#[cfg(target_arch = "x86")] -fn new_regs() -> Box { - box Registers { - eax: 0, ebx: 0, ecx: 0, edx: 0, - ebp: 0, esi: 0, edi: 0, esp: 0, - cs: 0, ds: 0, ss: 0, es: 0, fs: 0, gs: 0, - eflags: 0, eip: 0 - } -} - -#[cfg(target_arch = "x86")] -fn initialize_call_frame(regs: &mut Registers, fptr: InitFn, arg: uint, - procedure: raw::Procedure, sp: *mut uint) { - let sp = sp as *mut uint; - // x86 has interesting stack alignment requirements, so do some alignment - // plus some offsetting to figure out what the actual stack should be. - let sp = align_down(sp); - let sp = mut_offset(sp, -4); - - unsafe { *mut_offset(sp, 2) = procedure.env as uint }; - unsafe { *mut_offset(sp, 1) = procedure.code as uint }; - unsafe { *mut_offset(sp, 0) = arg as uint }; - let sp = mut_offset(sp, -1); - unsafe { *sp = 0 }; // The final return address - - regs.esp = sp as u32; - regs.eip = fptr as u32; - - // Last base pointer on the stack is 0 - regs.ebp = 0; -} - -// windows requires saving more registers (both general and XMM), so the windows -// register context must be larger. -#[cfg(all(windows, target_arch = "x86_64"))] -#[repr(C)] -struct Registers { - gpr:[libc::uintptr_t, ..14], - _xmm:[simd::u32x4, ..10] -} -#[cfg(all(not(windows), target_arch = "x86_64"))] -#[repr(C)] -struct Registers { - gpr:[libc::uintptr_t, ..10], - _xmm:[simd::u32x4, ..6] -} - -#[cfg(all(windows, target_arch = "x86_64"))] -fn new_regs() -> Box { - box() Registers { - gpr:[0,..14], - _xmm:[simd::u32x4(0,0,0,0),..10] - } -} -#[cfg(all(not(windows), target_arch = "x86_64"))] -fn new_regs() -> Box { - box() Registers { - gpr:[0,..10], - _xmm:[simd::u32x4(0,0,0,0),..6] - } -} - -#[cfg(target_arch = "x86_64")] -fn initialize_call_frame(regs: &mut Registers, fptr: InitFn, arg: uint, - procedure: raw::Procedure, sp: *mut uint) { - extern { fn rust_bootstrap_green_task(); } - - // Redefinitions from rt/arch/x86_64/regs.h - static RUSTRT_RSP: uint = 1; - static RUSTRT_IP: uint = 8; - static RUSTRT_RBP: uint = 2; - static RUSTRT_R12: uint = 4; - static RUSTRT_R13: uint = 5; - static RUSTRT_R14: uint = 6; - static RUSTRT_R15: uint = 7; - - let sp = align_down(sp); - let sp = mut_offset(sp, -1); - - // The final return address. 0 indicates the bottom of the stack - unsafe { *sp = 0; } - - rtdebug!("creating call frame"); - rtdebug!("fptr {:#x}", fptr as libc::uintptr_t); - rtdebug!("arg {:#x}", arg); - rtdebug!("sp {}", sp); - - // These registers are frobbed by rust_bootstrap_green_task into the right - // location so we can invoke the "real init function", `fptr`. - regs.gpr[RUSTRT_R12] = arg as libc::uintptr_t; - regs.gpr[RUSTRT_R13] = procedure.code as libc::uintptr_t; - regs.gpr[RUSTRT_R14] = procedure.env as libc::uintptr_t; - regs.gpr[RUSTRT_R15] = fptr as libc::uintptr_t; - - // These registers are picked up by the regular context switch paths. These - // will put us in "mostly the right context" except for frobbing all the - // arguments to the right place. We have the small trampoline code inside of - // rust_bootstrap_green_task to do that. - regs.gpr[RUSTRT_RSP] = sp as libc::uintptr_t; - regs.gpr[RUSTRT_IP] = rust_bootstrap_green_task as libc::uintptr_t; - - // Last base pointer on the stack should be 0 - regs.gpr[RUSTRT_RBP] = 0; -} - -#[cfg(target_arch = "arm")] -type Registers = [libc::uintptr_t, ..32]; - -#[cfg(target_arch = "arm")] -fn new_regs() -> Box { box {[0, .. 32]} } - -#[cfg(target_arch = "arm")] -fn initialize_call_frame(regs: &mut Registers, fptr: InitFn, arg: uint, - procedure: raw::Procedure, sp: *mut uint) { - extern { fn rust_bootstrap_green_task(); } - - let sp = align_down(sp); - // sp of arm eabi is 8-byte aligned - let sp = mut_offset(sp, -2); - - // The final return address. 0 indicates the bottom of the stack - unsafe { *sp = 0; } - - // ARM uses the same technique as x86_64 to have a landing pad for the start - // of all new green tasks. Neither r1/r2 are saved on a context switch, so - // the shim will copy r3/r4 into r1/r2 and then execute the function in r5 - regs[0] = arg as libc::uintptr_t; // r0 - regs[3] = procedure.code as libc::uintptr_t; // r3 - regs[4] = procedure.env as libc::uintptr_t; // r4 - regs[5] = fptr as libc::uintptr_t; // r5 - regs[13] = sp as libc::uintptr_t; // #52 sp, r13 - regs[14] = rust_bootstrap_green_task as libc::uintptr_t; // #56 pc, r14 --> lr -} - -#[cfg(any(target_arch = "mips", target_arch = "mipsel"))] -type Registers = [libc::uintptr_t, ..32]; - -#[cfg(any(target_arch = "mips", target_arch = "mipsel"))] -fn new_regs() -> Box { box {[0, .. 32]} } - -#[cfg(any(target_arch = "mips", target_arch = "mipsel"))] -fn initialize_call_frame(regs: &mut Registers, fptr: InitFn, arg: uint, - procedure: raw::Procedure, sp: *mut uint) { - let sp = align_down(sp); - // sp of mips o32 is 8-byte aligned - let sp = mut_offset(sp, -2); - - // The final return address. 0 indicates the bottom of the stack - unsafe { *sp = 0; } - - regs[4] = arg as libc::uintptr_t; - regs[5] = procedure.code as libc::uintptr_t; - regs[6] = procedure.env as libc::uintptr_t; - regs[29] = sp as libc::uintptr_t; - regs[25] = fptr as libc::uintptr_t; - regs[31] = fptr as libc::uintptr_t; -} - -fn align_down(sp: *mut uint) -> *mut uint { - let sp = (sp as uint) & !(16 - 1); - sp as *mut uint -} - -// ptr::mut_offset is positive ints only -#[inline] -pub fn mut_offset(ptr: *mut T, count: int) -> *mut T { - use std::mem::size_of; - (ptr as int + count * (size_of::() as int)) as *mut T -} diff --git a/src/libgreen/coroutine.rs b/src/libgreen/coroutine.rs deleted file mode 100644 index f2e64dc25a970423d442a5e3e8dbda92250ebb0f..0000000000000000000000000000000000000000 --- a/src/libgreen/coroutine.rs +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -// Coroutines represent nothing more than a context and a stack -// segment. - -use context::Context; -use stack::{StackPool, Stack}; - -/// A coroutine is nothing more than a (register context, stack) pair. -pub struct Coroutine { - /// The segment of stack on which the task is currently running or - /// if the task is blocked, on which the task will resume - /// execution. - /// - /// Servo needs this to be public in order to tell SpiderMonkey - /// about the stack bounds. - pub current_stack_segment: Stack, - - /// Always valid if the task is alive and not running. - pub saved_context: Context -} - -impl Coroutine { - pub fn empty() -> Coroutine { - Coroutine { - current_stack_segment: unsafe { Stack::dummy_stack() }, - saved_context: Context::empty() - } - } - - /// Destroy coroutine and try to reuse std::stack segment. - pub fn recycle(self, stack_pool: &mut StackPool) { - let Coroutine { current_stack_segment, .. } = self; - stack_pool.give_stack(current_stack_segment); - } -} diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs deleted file mode 100644 index 4e2908dd2b025b16fde151f25dfc7cbbc3ae8f38..0000000000000000000000000000000000000000 --- a/src/libgreen/lib.rs +++ /dev/null @@ -1,567 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! The "green scheduling" library -//! -//! This library provides M:N threading for rust programs. Internally this has -//! the implementation of a green scheduler along with context switching and a -//! stack-allocation strategy. This can be optionally linked in to rust -//! programs in order to provide M:N functionality inside of 1:1 programs. -//! -//! # Architecture -//! -//! An M:N scheduling library implies that there are N OS thread upon which M -//! "green threads" are multiplexed. In other words, a set of green threads are -//! all run inside a pool of OS threads. -//! -//! With this design, you can achieve _concurrency_ by spawning many green -//! threads, and you can achieve _parallelism_ by running the green threads -//! simultaneously on multiple OS threads. Each OS thread is a candidate for -//! being scheduled on a different core (the source of parallelism), and then -//! all of the green threads cooperatively schedule amongst one another (the -//! source of concurrency). -//! -//! ## Schedulers -//! -//! In order to coordinate among green threads, each OS thread is primarily -//! running something which we call a Scheduler. Whenever a reference to a -//! Scheduler is made, it is synonymous to referencing one OS thread. Each -//! scheduler is bound to one and exactly one OS thread, and the thread that it -//! is bound to never changes. -//! -//! Each scheduler is connected to a pool of other schedulers (a `SchedPool`) -//! which is the thread pool term from above. A pool of schedulers all share the -//! work that they create. Furthermore, whenever a green thread is created (also -//! synonymously referred to as a green task), it is associated with a -//! `SchedPool` forevermore. A green thread cannot leave its scheduler pool. -//! -//! Schedulers can have at most one green thread running on them at a time. When -//! a scheduler is asleep on its event loop, there are no green tasks running on -//! the OS thread or the scheduler. The term "context switch" is used for when -//! the running green thread is swapped out, but this simply changes the one -//! green thread which is running on the scheduler. -//! -//! ## Green Threads -//! -//! A green thread can largely be summarized by a stack and a register context. -//! Whenever a green thread is spawned, it allocates a stack, and then prepares -//! a register context for execution. The green task may be executed across -//! multiple OS threads, but it will always use the same stack and it will carry -//! its register context across OS threads. -//! -//! Each green thread is cooperatively scheduled with other green threads. -//! Primarily, this means that there is no pre-emption of a green thread. The -//! major consequence of this design is that a green thread stuck in an infinite -//! loop will prevent all other green threads from running on that particular -//! scheduler. -//! -//! Scheduling events for green threads occur on communication and I/O -//! boundaries. For example, if a green task blocks waiting for a message on a -//! channel some other green thread can now run on the scheduler. This also has -//! the consequence that until a green thread performs any form of scheduling -//! event, it will be running on the same OS thread (unconditionally). -//! -//! ## Work Stealing -//! -//! With a pool of schedulers, a new green task has a number of options when -//! deciding where to run initially. The current implementation uses a concept -//! called work stealing in order to spread out work among schedulers. -//! -//! In a work-stealing model, each scheduler maintains a local queue of tasks to -//! run, and this queue is stolen from by other schedulers. Implementation-wise, -//! work stealing has some hairy parts, but from a user-perspective, work -//! stealing simply implies what with M green threads and N schedulers where -//! M > N it is very likely that all schedulers will be busy executing work. -//! -//! # Considerations when using libgreen -//! -//! An M:N runtime has both pros and cons, and there is no one answer as to -//! whether M:N or 1:1 is appropriate to use. As always, there are many -//! advantages and disadvantages between the two. Regardless of the workload, -//! however, there are some aspects of using green thread which you should be -//! aware of: -//! -//! * The largest concern when using libgreen is interoperating with native -//! code. Care should be taken when calling native code that will block the OS -//! thread as it will prevent further green tasks from being scheduled on the -//! OS thread. -//! -//! * Native code using thread-local-storage should be approached -//! with care. Green threads may migrate among OS threads at any time, so -//! native libraries using thread-local state may not always work. -//! -//! * Native synchronization primitives (e.g. pthread mutexes) will also not -//! work for green threads. The reason for this is because native primitives -//! often operate on a _os thread_ granularity whereas green threads are -//! operating on a more granular unit of work. -//! -//! * A green threading runtime is not fork-safe. If the process forks(), it -//! cannot expect to make reasonable progress by continuing to use green -//! threads. -//! -//! Note that these concerns do not mean that operating with native code is a -//! lost cause. These are simply just concerns which should be considered when -//! invoking native code. -//! -//! # Starting with libgreen -//! -//! ```rust -//! extern crate green; -//! -//! #[start] -//! fn start(argc: int, argv: *const *const u8) -> int { -//! green::start(argc, argv, green::basic::event_loop, main) -//! } -//! -//! fn main() { -//! // this code is running in a pool of schedulers -//! } -//! ``` -//! -//! > **Note**: This `main` function in this example does *not* have I/O -//! > support. The basic event loop does not provide any support -//! -//! # Using a scheduler pool -//! -//! This library adds a `GreenTaskBuilder` trait that extends the methods -//! available on `std::task::TaskBuilder` to allow spawning a green task, -//! possibly pinned to a particular scheduler thread: -//! -//! ```rust -//! extern crate green; -//! -//! # fn main() { -//! use std::task::TaskBuilder; -//! use green::{SchedPool, PoolConfig, GreenTaskBuilder}; -//! -//! let mut config = PoolConfig::new(); -//! -//! let mut pool = SchedPool::new(config); -//! -//! // Spawn tasks into the pool of schedulers -//! TaskBuilder::new().green(&mut pool).spawn(proc() { -//! // this code is running inside the pool of schedulers -//! -//! spawn(proc() { -//! // this code is also running inside the same scheduler pool -//! }); -//! }); -//! -//! // Dynamically add a new scheduler to the scheduler pool. This adds another -//! // OS thread that green threads can be multiplexed on to. -//! let mut handle = pool.spawn_sched(); -//! -//! // Pin a task to the spawned scheduler -//! TaskBuilder::new().green_pinned(&mut pool, &mut handle).spawn(proc() { -//! /* ... */ -//! }); -//! -//! // Handles keep schedulers alive, so be sure to drop all handles before -//! // destroying the sched pool -//! drop(handle); -//! -//! // Required to shut down this scheduler pool. -//! // The task will panic if `shutdown` is not called. -//! pool.shutdown(); -//! # } -//! ``` - -#![crate_name = "green"] -#![experimental] -#![license = "MIT/ASL2"] -#![crate_type = "rlib"] -#![crate_type = "dylib"] -#![doc(html_logo_url = "http://www.rust-lang.org/logos/rust-logo-128x128-blk-v2.png", - html_favicon_url = "http://www.rust-lang.org/favicon.ico", - html_root_url = "http://doc.rust-lang.org/nightly/", - html_playground_url = "http://play.rust-lang.org/")] - -#![feature(macro_rules, phase, default_type_params, globs)] -#![allow(deprecated)] - -#[cfg(test)] #[phase(plugin, link)] extern crate log; -extern crate libc; -extern crate alloc; - -use alloc::arc::Arc; -use std::mem::replace; -use std::os; -use std::rt::rtio; -use std::rt::thread::Thread; -use std::rt::task::TaskOpts; -use std::rt; -use std::sync::atomic::{SeqCst, AtomicUint, INIT_ATOMIC_UINT}; -use std::sync::deque; -use std::task::{TaskBuilder, Spawner}; - -use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, PinnedTask, NewNeighbor}; -use sleeper_list::SleeperList; -use stack::StackPool; -use task::GreenTask; - -mod macros; -mod simple; -mod message_queue; - -pub mod basic; -pub mod context; -pub mod coroutine; -pub mod sched; -pub mod sleeper_list; -pub mod stack; -pub mod task; - -/// Set up a default runtime configuration, given compiler-supplied arguments. -/// -/// This function will block until the entire pool of M:N schedulers have -/// exited. This function also requires a local task to be available. -/// -/// # Arguments -/// -/// * `argc` & `argv` - The argument vector. On Unix this information is used -/// by os::args. -/// * `main` - The initial procedure to run inside of the M:N scheduling pool. -/// Once this procedure exits, the scheduling pool will begin to shut -/// down. The entire pool (and this function) will only return once -/// all child tasks have finished executing. -/// -/// # Return value -/// -/// The return value is used as the process return code. 0 on success, 101 on -/// error. -pub fn start(argc: int, argv: *const *const u8, - event_loop_factory: fn() -> Box, - main: proc():Send) -> int { - rt::init(argc, argv); - let mut main = Some(main); - let mut ret = None; - simple::task().run(|| { - ret = Some(run(event_loop_factory, main.take().unwrap())); - }).destroy(); - // unsafe is ok b/c we're sure that the runtime is gone - unsafe { rt::cleanup() } - ret.unwrap() -} - -/// Execute the main function in a pool of M:N schedulers. -/// -/// Configures the runtime according to the environment, by default using a task -/// scheduler with the same number of threads as cores. Returns a process exit -/// code. -/// -/// This function will not return until all schedulers in the associated pool -/// have returned. -pub fn run(event_loop_factory: fn() -> Box, - main: proc():Send) -> int { - // Create a scheduler pool and spawn the main task into this pool. We will - // get notified over a channel when the main task exits. - let mut cfg = PoolConfig::new(); - cfg.event_loop_factory = event_loop_factory; - let mut pool = SchedPool::new(cfg); - let (tx, rx) = channel(); - let mut opts = TaskOpts::new(); - opts.on_exit = Some(proc(r) tx.send(r)); - opts.name = Some("
".into_maybe_owned()); - pool.spawn(opts, main); - - // Wait for the main task to return, and set the process error code - // appropriately. - if rx.recv().is_err() { - os::set_exit_status(rt::DEFAULT_ERROR_CODE); - } - - // Now that we're sure all tasks are dead, shut down the pool of schedulers, - // waiting for them all to return. - pool.shutdown(); - os::get_exit_status() -} - -/// Configuration of how an M:N pool of schedulers is spawned. -pub struct PoolConfig { - /// The number of schedulers (OS threads) to spawn into this M:N pool. - pub threads: uint, - /// A factory function used to create new event loops. If this is not - /// specified then the default event loop factory is used. - pub event_loop_factory: fn() -> Box, -} - -impl PoolConfig { - /// Returns the default configuration, as determined the environment - /// variables of this process. - pub fn new() -> PoolConfig { - PoolConfig { - threads: rt::default_sched_threads(), - event_loop_factory: basic::event_loop, - } - } -} - -/// A structure representing a handle to a pool of schedulers. This handle is -/// used to keep the pool alive and also reap the status from the pool. -pub struct SchedPool { - id: uint, - threads: Vec>, - handles: Vec, - stealers: Vec>>, - next_friend: uint, - stack_pool: StackPool, - deque_pool: deque::BufferPool>, - sleepers: SleeperList, - factory: fn() -> Box, - task_state: TaskState, - tasks_done: Receiver<()>, -} - -/// This is an internal state shared among a pool of schedulers. This is used to -/// keep track of how many tasks are currently running in the pool and then -/// sending on a channel once the entire pool has been drained of all tasks. -#[deriving(Clone)] -pub struct TaskState { - cnt: Arc, - done: Sender<()>, -} - -impl SchedPool { - /// Execute the main function in a pool of M:N schedulers. - /// - /// This will configure the pool according to the `config` parameter, and - /// initially run `main` inside the pool of schedulers. - pub fn new(config: PoolConfig) -> SchedPool { - static POOL_ID: AtomicUint = INIT_ATOMIC_UINT; - - let PoolConfig { - threads: nscheds, - event_loop_factory: factory - } = config; - assert!(nscheds > 0); - - // The pool of schedulers that will be returned from this function - let (p, state) = TaskState::new(); - let mut pool = SchedPool { - threads: vec![], - handles: vec![], - stealers: vec![], - id: POOL_ID.fetch_add(1, SeqCst), - sleepers: SleeperList::new(), - stack_pool: StackPool::new(), - deque_pool: deque::BufferPool::new(), - next_friend: 0, - factory: factory, - task_state: state, - tasks_done: p, - }; - - // Create a work queue for each scheduler, ntimes. Create an extra - // for the main thread if that flag is set. We won't steal from it. - let mut workers = Vec::with_capacity(nscheds); - let mut stealers = Vec::with_capacity(nscheds); - - for _ in range(0, nscheds) { - let (w, s) = pool.deque_pool.deque(); - workers.push(w); - stealers.push(s); - } - pool.stealers = stealers; - - // Now that we've got all our work queues, create one scheduler per - // queue, spawn the scheduler into a thread, and be sure to keep a - // handle to the scheduler and the thread to keep them alive. - for worker in workers.into_iter() { - rtdebug!("inserting a regular scheduler"); - - let mut sched = box Scheduler::new(pool.id, - (pool.factory)(), - worker, - pool.stealers.clone(), - pool.sleepers.clone(), - pool.task_state.clone()); - pool.handles.push(sched.make_handle()); - pool.threads.push(Thread::start(proc() { sched.bootstrap(); })); - } - - return pool; - } - - /// Creates a new task configured to run inside of this pool of schedulers. - /// This is useful to create a task which can then be sent to a specific - /// scheduler created by `spawn_sched` (and possibly pin it to that - /// scheduler). - #[deprecated = "use the green and green_pinned methods of GreenTaskBuilder instead"] - pub fn task(&mut self, opts: TaskOpts, f: proc():Send) -> Box { - GreenTask::configure(&mut self.stack_pool, opts, f) - } - - /// Spawns a new task into this pool of schedulers, using the specified - /// options to configure the new task which is spawned. - /// - /// New tasks are spawned in a round-robin fashion to the schedulers in this - /// pool, but tasks can certainly migrate among schedulers once they're in - /// the pool. - #[deprecated = "use the green and green_pinned methods of GreenTaskBuilder instead"] - pub fn spawn(&mut self, opts: TaskOpts, f: proc():Send) { - let task = self.task(opts, f); - - // Figure out someone to send this task to - let idx = self.next_friend; - self.next_friend += 1; - if self.next_friend >= self.handles.len() { - self.next_friend = 0; - } - - // Jettison the task away! - self.handles[idx].send(TaskFromFriend(task)); - } - - /// Spawns a new scheduler into this M:N pool. A handle is returned to the - /// scheduler for use. The scheduler will not exit as long as this handle is - /// active. - /// - /// The scheduler spawned will participate in work stealing with all of the - /// other schedulers currently in the scheduler pool. - pub fn spawn_sched(&mut self) -> SchedHandle { - let (worker, stealer) = self.deque_pool.deque(); - self.stealers.push(stealer.clone()); - - // Tell all existing schedulers about this new scheduler so they can all - // steal work from it - for handle in self.handles.iter_mut() { - handle.send(NewNeighbor(stealer.clone())); - } - - // Create the new scheduler, using the same sleeper list as all the - // other schedulers as well as having a stealer handle to all other - // schedulers. - let mut sched = box Scheduler::new(self.id, - (self.factory)(), - worker, - self.stealers.clone(), - self.sleepers.clone(), - self.task_state.clone()); - let ret = sched.make_handle(); - self.handles.push(sched.make_handle()); - self.threads.push(Thread::start(proc() { sched.bootstrap() })); - - return ret; - } - - /// Consumes the pool of schedulers, waiting for all tasks to exit and all - /// schedulers to shut down. - /// - /// This function is required to be called in order to drop a pool of - /// schedulers, it is considered an error to drop a pool without calling - /// this method. - /// - /// This only waits for all tasks in *this pool* of schedulers to exit, any - /// native tasks or extern pools will not be waited on - pub fn shutdown(mut self) { - self.stealers = vec![]; - - // Wait for everyone to exit. We may have reached a 0-task count - // multiple times in the past, meaning there could be several buffered - // messages on the `tasks_done` port. We're guaranteed that after *some* - // message the current task count will be 0, so we just receive in a - // loop until everything is totally dead. - while self.task_state.active() { - self.tasks_done.recv(); - } - - // Now that everyone's gone, tell everything to shut down. - for mut handle in replace(&mut self.handles, vec![]).into_iter() { - handle.send(Shutdown); - } - for thread in replace(&mut self.threads, vec![]).into_iter() { - thread.join(); - } - } -} - -impl TaskState { - fn new() -> (Receiver<()>, TaskState) { - let (tx, rx) = channel(); - (rx, TaskState { - cnt: Arc::new(AtomicUint::new(0)), - done: tx, - }) - } - - fn increment(&mut self) { - self.cnt.fetch_add(1, SeqCst); - } - - fn active(&self) -> bool { - self.cnt.load(SeqCst) != 0 - } - - fn decrement(&mut self) { - let prev = self.cnt.fetch_sub(1, SeqCst); - if prev == 1 { - self.done.send(()); - } - } -} - -impl Drop for SchedPool { - fn drop(&mut self) { - if self.threads.len() > 0 { - panic!("dropping a M:N scheduler pool that wasn't shut down"); - } - } -} - -/// A spawner for green tasks -pub struct GreenSpawner<'a>{ - pool: &'a mut SchedPool, - handle: Option<&'a mut SchedHandle> -} - -impl<'a> Spawner for GreenSpawner<'a> { - #[inline] - fn spawn(self, opts: TaskOpts, f: proc():Send) { - let GreenSpawner { pool, handle } = self; - match handle { - None => pool.spawn(opts, f), - Some(h) => h.send(PinnedTask(pool.task(opts, f))) - } - } -} - -/// An extension trait adding `green` configuration methods to `TaskBuilder`. -pub trait GreenTaskBuilder { - fn green<'a>(self, &'a mut SchedPool) -> TaskBuilder>; - fn green_pinned<'a>(self, &'a mut SchedPool, &'a mut SchedHandle) - -> TaskBuilder>; -} - -impl GreenTaskBuilder for TaskBuilder { - fn green<'a>(self, pool: &'a mut SchedPool) -> TaskBuilder> { - self.spawner(GreenSpawner {pool: pool, handle: None}) - } - - fn green_pinned<'a>(self, pool: &'a mut SchedPool, handle: &'a mut SchedHandle) - -> TaskBuilder> { - self.spawner(GreenSpawner {pool: pool, handle: Some(handle)}) - } -} - -#[cfg(test)] -mod test { - use std::task::TaskBuilder; - use super::{SchedPool, PoolConfig, GreenTaskBuilder}; - - #[test] - fn test_green_builder() { - let mut pool = SchedPool::new(PoolConfig::new()); - let res = TaskBuilder::new().green(&mut pool).try(proc() { - "Success!".to_string() - }); - assert_eq!(res.ok().unwrap(), "Success!".to_string()); - pool.shutdown(); - } -} diff --git a/src/libgreen/macros.rs b/src/libgreen/macros.rs deleted file mode 100644 index 4cce430d88a8d3522e1e75e776a64ab1a29cd97b..0000000000000000000000000000000000000000 --- a/src/libgreen/macros.rs +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -// FIXME: this file probably shouldn't exist -// ignore-lexer-test FIXME #15677 - -#![macro_escape] - -use std::fmt; - -// Indicates whether we should perform expensive sanity checks, including rtassert! -// FIXME: Once the runtime matures remove the `true` below to turn off rtassert, etc. -pub static ENFORCE_SANITY: bool = true || !cfg!(rtopt) || cfg!(rtdebug) || cfg!(rtassert); - -macro_rules! rterrln ( - ($($arg:tt)*) => ( { - format_args!(::macros::dumb_println, $($arg)*) - } ) -) - -// Some basic logging. Enabled by passing `--cfg rtdebug` to the libstd build. -macro_rules! rtdebug ( - ($($arg:tt)*) => ( { - if cfg!(rtdebug) { - rterrln!($($arg)*) - } - }) -) - -macro_rules! rtassert ( - ( $arg:expr ) => ( { - if ::macros::ENFORCE_SANITY { - if !$arg { - rtabort!(" assertion failed: {}", stringify!($arg)); - } - } - } ) -) - - -macro_rules! rtabort ( - ($($arg:tt)*) => ( { - ::macros::abort(format!($($arg)*).as_slice()); - } ) -) - -pub fn dumb_println(args: &fmt::Arguments) { - use std::rt; - let mut w = rt::Stderr; - let _ = writeln!(&mut w, "{}", args); -} - -pub fn abort(msg: &str) -> ! { - let msg = if !msg.is_empty() { msg } else { "aborted" }; - let hash = msg.chars().fold(0, |accum, val| accum + (val as uint) ); - let quote = match hash % 10 { - 0 => " -It was from the artists and poets that the pertinent answers came, and I -know that panic would have broken loose had they been able to compare notes. -As it was, lacking their original letters, I half suspected the compiler of -having asked leading questions, or of having edited the correspondence in -corroboration of what he had latently resolved to see.", - 1 => " -There are not many persons who know what wonders are opened to them in the -stories and visions of their youth; for when as children we listen and dream, -we think but half-formed thoughts, and when as men we try to remember, we are -dulled and prosaic with the poison of life. But some of us awake in the night -with strange phantasms of enchanted hills and gardens, of fountains that sing -in the sun, of golden cliffs overhanging murmuring seas, of plains that stretch -down to sleeping cities of bronze and stone, and of shadowy companies of heroes -that ride caparisoned white horses along the edges of thick forests; and then -we know that we have looked back through the ivory gates into that world of -wonder which was ours before we were wise and unhappy.", - 2 => " -Instead of the poems I had hoped for, there came only a shuddering blackness -and ineffable loneliness; and I saw at last a fearful truth which no one had -ever dared to breathe before — the unwhisperable secret of secrets — The fact -that this city of stone and stridor is not a sentient perpetuation of Old New -York as London is of Old London and Paris of Old Paris, but that it is in fact -quite dead, its sprawling body imperfectly embalmed and infested with queer -animate things which have nothing to do with it as it was in life.", - 3 => " -The ocean ate the last of the land and poured into the smoking gulf, thereby -giving up all it had ever conquered. From the new-flooded lands it flowed -again, uncovering death and decay; and from its ancient and immemorial bed it -trickled loathsomely, uncovering nighted secrets of the years when Time was -young and the gods unborn. Above the waves rose weedy remembered spires. The -moon laid pale lilies of light on dead London, and Paris stood up from its damp -grave to be sanctified with star-dust. Then rose spires and monoliths that were -weedy but not remembered; terrible spires and monoliths of lands that men never -knew were lands...", - 4 => " -There was a night when winds from unknown spaces whirled us irresistibly into -limitless vacuum beyond all thought and entity. Perceptions of the most -maddeningly untransmissible sort thronged upon us; perceptions of infinity -which at the time convulsed us with joy, yet which are now partly lost to my -memory and partly incapable of presentation to others.", - _ => "You've met with a terrible fate, haven't you?" - }; - rterrln!("{}", ""); - rterrln!("{}", quote); - rterrln!("{}", ""); - rterrln!("fatal runtime error: {}", msg); - - abort(); - - fn abort() -> ! { - use std::intrinsics; - unsafe { intrinsics::abort() } - } -} diff --git a/src/libgreen/message_queue.rs b/src/libgreen/message_queue.rs deleted file mode 100644 index c589a9fb592d8804d1407b0adabd1575005d6dd3..0000000000000000000000000000000000000000 --- a/src/libgreen/message_queue.rs +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -pub use self::PopResult::*; - -use alloc::arc::Arc; -use std::sync::mpsc_queue as mpsc; -use std::kinds::marker; - -pub enum PopResult { - Inconsistent, - Empty, - Data(T), -} - -pub fn queue() -> (Consumer, Producer) { - let a = Arc::new(mpsc::Queue::new()); - (Consumer { inner: a.clone(), noshare: marker::NoSync }, - Producer { inner: a, noshare: marker::NoSync }) -} - -pub struct Producer { - inner: Arc>, - noshare: marker::NoSync, -} - -pub struct Consumer { - inner: Arc>, - noshare: marker::NoSync, -} - -impl Consumer { - pub fn pop(&self) -> PopResult { - match self.inner.pop() { - mpsc::Inconsistent => Inconsistent, - mpsc::Empty => Empty, - mpsc::Data(t) => Data(t), - } - } - - pub fn casual_pop(&self) -> Option { - match self.inner.pop() { - mpsc::Inconsistent => None, - mpsc::Empty => None, - mpsc::Data(t) => Some(t), - } - } -} - -impl Producer { - pub fn push(&self, t: T) { - self.inner.push(t); - } -} - -impl Clone for Producer { - fn clone(&self) -> Producer { - Producer { inner: self.inner.clone(), noshare: marker::NoSync } - } -} diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs deleted file mode 100644 index e8cb65d35df6a1c1434f7bc91db12a440866f2ad..0000000000000000000000000000000000000000 --- a/src/libgreen/sched.rs +++ /dev/null @@ -1,1523 +0,0 @@ -// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -pub use self::SchedMessage::*; -use self::EffortLevel::*; - -use std::mem; -use std::rt::local::Local; -use std::rt::mutex::NativeMutex; -use std::rt::rtio::{RemoteCallback, PausableIdleCallback, Callback, EventLoop}; -use std::rt::task::BlockedTask; -use std::rt::task::Task; -use std::sync::deque; -use std::raw; - -use std::rand::{XorShiftRng, Rng, Rand}; - -use TaskState; -use context::Context; -use coroutine::Coroutine; -use sleeper_list::SleeperList; -use stack::StackPool; -use task::{TypeSched, GreenTask, HomeSched, AnySched}; -use message_queue as msgq; - -/// A scheduler is responsible for coordinating the execution of Tasks -/// on a single thread. The scheduler runs inside a slightly modified -/// Rust Task. When not running this task is stored in the scheduler -/// struct. The scheduler struct acts like a baton, all scheduling -/// actions are transfers of the baton. -/// -/// FIXME: This creates too many callbacks to run_sched_once, resulting -/// in too much allocation and too many events. -pub struct Scheduler { - /// ID number of the pool that this scheduler is a member of. When - /// reawakening green tasks, this is used to ensure that tasks aren't - /// reawoken on the wrong pool of schedulers. - pub pool_id: uint, - /// The pool of stacks that this scheduler has cached - pub stack_pool: StackPool, - /// Bookkeeping for the number of tasks which are currently running around - /// inside this pool of schedulers - pub task_state: TaskState, - /// There are N work queues, one per scheduler. - work_queue: deque::Worker>, - /// Work queues for the other schedulers. These are created by - /// cloning the core work queues. - work_queues: Vec>>, - /// The queue of incoming messages from other schedulers. - /// These are enqueued by SchedHandles after which a remote callback - /// is triggered to handle the message. - message_queue: msgq::Consumer, - /// Producer used to clone sched handles from - message_producer: msgq::Producer, - /// A shared list of sleeping schedulers. We'll use this to wake - /// up schedulers when pushing work onto the work queue. - sleeper_list: SleeperList, - /// Indicates that we have previously pushed a handle onto the - /// SleeperList but have not yet received the Wake message. - /// Being `true` does not necessarily mean that the scheduler is - /// not active since there are multiple event sources that may - /// wake the scheduler. It just prevents the scheduler from pushing - /// multiple handles onto the sleeper list. - sleepy: bool, - /// A flag to indicate we've received the shutdown message and should - /// no longer try to go to sleep, but exit instead. - no_sleep: bool, - /// The scheduler runs on a special task. When it is not running - /// it is stored here instead of the work queue. - sched_task: Option>, - /// An action performed after a context switch on behalf of the - /// code running before the context switch - cleanup_job: Option, - /// If the scheduler shouldn't run some tasks, a friend to send - /// them to. - friend_handle: Option, - /// Should this scheduler run any task, or only pinned tasks? - run_anything: bool, - /// A fast XorShift rng for scheduler use - rng: XorShiftRng, - /// A toggleable idle callback - idle_callback: Option>, - /// A countdown that starts at a random value and is decremented - /// every time a yield check is performed. When it hits 0 a task - /// will yield. - yield_check_count: uint, - /// A flag to tell the scheduler loop it needs to do some stealing - /// in order to introduce randomness as part of a yield - steal_for_yield: bool, - - // n.b. currently destructors of an object are run in top-to-bottom in order - // of field declaration. Due to its nature, the pausable idle callback - // must have some sort of handle to the event loop, so it needs to get - // destroyed before the event loop itself. For this reason, we destroy - // the event loop last to ensure that any unsafe references to it are - // destroyed before it's actually destroyed. - - /// The event loop used to drive the scheduler and perform I/O - pub event_loop: Box, -} - -/// An indication of how hard to work on a given operation, the difference -/// mainly being whether memory is synchronized or not -#[deriving(PartialEq)] -enum EffortLevel { - DontTryTooHard, - GiveItYourBest -} - -static MAX_YIELD_CHECKS: uint = 20000; - -fn reset_yield_check(rng: &mut XorShiftRng) -> uint { - let r: uint = Rand::rand(rng); - r % MAX_YIELD_CHECKS + 1 -} - -impl Scheduler { - - // * Initialization Functions - - pub fn new(pool_id: uint, - event_loop: Box, - work_queue: deque::Worker>, - work_queues: Vec>>, - sleeper_list: SleeperList, - state: TaskState) - -> Scheduler { - - Scheduler::new_special(pool_id, event_loop, work_queue, work_queues, - sleeper_list, true, None, state) - - } - - pub fn new_special(pool_id: uint, - event_loop: Box, - work_queue: deque::Worker>, - work_queues: Vec>>, - sleeper_list: SleeperList, - run_anything: bool, - friend: Option, - state: TaskState) - -> Scheduler { - - let (consumer, producer) = msgq::queue(); - let mut sched = Scheduler { - pool_id: pool_id, - sleeper_list: sleeper_list, - message_queue: consumer, - message_producer: producer, - sleepy: false, - no_sleep: false, - event_loop: event_loop, - work_queue: work_queue, - work_queues: work_queues, - stack_pool: StackPool::new(), - sched_task: None, - cleanup_job: None, - run_anything: run_anything, - friend_handle: friend, - rng: new_sched_rng(), - idle_callback: None, - yield_check_count: 0, - steal_for_yield: false, - task_state: state, - }; - - sched.yield_check_count = reset_yield_check(&mut sched.rng); - - return sched; - } - - // FIXME: This may eventually need to be refactored so that - // the scheduler itself doesn't have to call event_loop.run. - // That will be important for embedding the runtime into external - // event loops. - - // Take a main task to run, and a scheduler to run it in. Create a - // scheduler task and bootstrap into it. - pub fn bootstrap(mut self: Box) { - - // Build an Idle callback. - let cb = box SchedRunner as Box; - self.idle_callback = Some(self.event_loop.pausable_idle_callback(cb)); - - // Create a task for the scheduler with an empty context. - let sched_task = GreenTask::new_typed(Some(Coroutine::empty()), - TypeSched); - - // Before starting our first task, make sure the idle callback - // is active. As we do not start in the sleep state this is - // important. - self.idle_callback.as_mut().unwrap().resume(); - - // Now, as far as all the scheduler state is concerned, we are inside - // the "scheduler" context. The scheduler immediately hands over control - // to the event loop, and this will only exit once the event loop no - // longer has any references (handles or I/O objects). - rtdebug!("starting scheduler {}", self.sched_id()); - let mut sched_task = self.run(sched_task); - - // Close the idle callback. - let mut sched = sched_task.sched.take().unwrap(); - sched.idle_callback.take(); - // Make one go through the loop to run the close callback. - let mut stask = sched.run(sched_task); - - // Now that we are done with the scheduler, clean up the - // scheduler task. Do so by removing it from TLS and manually - // cleaning up the memory it uses. As we didn't actually call - // task.run() on the scheduler task we never get through all - // the cleanup code it runs. - rtdebug!("stopping scheduler {}", stask.sched.as_ref().unwrap().sched_id()); - - // Should not have any messages - let message = stask.sched.as_mut().unwrap().message_queue.pop(); - rtassert!(match message { msgq::Empty => true, _ => false }); - - stask.task.take().unwrap().drop(); - } - - // This does not return a scheduler, as the scheduler is placed - // inside the task. - pub fn run(mut self: Box, stask: Box) - -> Box { - - // This is unsafe because we need to place the scheduler, with - // the event_loop inside, inside our task. But we still need a - // mutable reference to the event_loop to give it the "run" - // command. - unsafe { - let event_loop: *mut Box = &mut self.event_loop; - // Our scheduler must be in the task before the event loop - // is started. - stask.put_with_sched(self); - (*event_loop).run(); - } - - // This is a serious code smell, but this function could be done away - // with if necessary. The ownership of `stask` was transferred into - // local storage just before the event loop ran, so it is possible to - // transmute `stask` as a uint across the running of the event loop to - // re-acquire ownership here. - // - // This would involve removing the Task from TLS, removing the runtime, - // forgetting the runtime, and then putting the task into `stask`. For - // now, because we have `GreenTask::convert`, I chose to take this - // method for cleanliness. This function is *not* a fundamental reason - // why this function should exist. - GreenTask::convert(Local::take()) - } - - // * Execution Functions - Core Loop Logic - - // This function is run from the idle callback on the uv loop, indicating - // that there are no I/O events pending. When this function returns, we will - // fall back to epoll() in the uv event loop, waiting for more things to - // happen. We may come right back off epoll() if the idle callback is still - // active, in which case we're truly just polling to see if I/O events are - // complete. - // - // The model for this function is to execute as much work as possible while - // still fairly considering I/O tasks. Falling back to epoll() frequently is - // often quite expensive, so we attempt to avoid it as much as possible. If - // we have any active I/O on the event loop, then we're forced to fall back - // to epoll() in order to provide fairness, but as long as we're doing work - // and there's no active I/O, we can continue to do work. - // - // If we try really hard to do some work, but no work is available to be - // done, then we fall back to epoll() to block this thread waiting for more - // work (instead of busy waiting). - fn run_sched_once(mut self: Box, stask: Box) { - // Make sure that we're not lying in that the `stask` argument is indeed - // the scheduler task for this scheduler. - assert!(self.sched_task.is_none()); - - // Assume that we need to continue idling unless we reach the - // end of this function without performing an action. - self.idle_callback.as_mut().unwrap().resume(); - - // First we check for scheduler messages, these are higher - // priority than regular tasks. - let (mut sched, mut stask, mut did_work) = - self.interpret_message_queue(stask, DontTryTooHard); - - // After processing a message, we consider doing some more work on the - // event loop. The "keep going" condition changes after the first - // iteration because we don't want to spin here infinitely. - // - // Once we start doing work we can keep doing work so long as the - // iteration does something. Note that we don't want to starve the - // message queue here, so each iteration when we're done working we - // check the message queue regardless of whether we did work or not. - let mut keep_going = !did_work || !sched.event_loop.has_active_io(); - while keep_going { - let (a, b, c) = match sched.do_work(stask) { - (sched, task, false) => { - sched.interpret_message_queue(task, GiveItYourBest) - } - (sched, task, true) => { - let (sched, task, _) = - sched.interpret_message_queue(task, GiveItYourBest); - (sched, task, true) - } - }; - sched = a; - stask = b; - did_work = c; - - // We only keep going if we managed to do something productive and - // also don't have any active I/O. If we didn't do anything, we - // should consider going to sleep, and if we have active I/O we need - // to poll for completion. - keep_going = did_work && !sched.event_loop.has_active_io(); - } - - // If we ever did some work, then we shouldn't put our scheduler - // entirely to sleep just yet. Leave the idle callback active and fall - // back to epoll() to see what's going on. - if did_work { - return stask.put_with_sched(sched); - } - - // If we got here then there was no work to do. - // Generate a SchedHandle and push it to the sleeper list so - // somebody can wake us up later. - if !sched.sleepy && !sched.no_sleep { - rtdebug!("scheduler has no work to do, going to sleep"); - sched.sleepy = true; - let handle = sched.make_handle(); - sched.sleeper_list.push(handle); - // Since we are sleeping, deactivate the idle callback. - sched.idle_callback.as_mut().unwrap().pause(); - } else { - rtdebug!("not sleeping, already doing so or no_sleep set"); - // We may not be sleeping, but we still need to deactivate - // the idle callback. - sched.idle_callback.as_mut().unwrap().pause(); - } - - // Finished a cycle without using the Scheduler. Place it back - // in TLS. - stask.put_with_sched(sched); - } - - // This function returns None if the scheduler is "used", or it - // returns the still-available scheduler. At this point all - // message-handling will count as a turn of work, and as a result - // return None. - fn interpret_message_queue(mut self: Box, - stask: Box, - effort: EffortLevel) - -> (Box, Box, bool) { - let msg = if effort == DontTryTooHard { - self.message_queue.casual_pop() - } else { - // When popping our message queue, we could see an "inconsistent" - // state which means that we *should* be able to pop data, but we - // are unable to at this time. Our options are: - // - // 1. Spin waiting for data - // 2. Ignore this and pretend we didn't find a message - // - // If we choose route 1, then if the pusher in question is currently - // pre-empted, we're going to take up our entire time slice just - // spinning on this queue. If we choose route 2, then the pusher in - // question is still guaranteed to make a send() on its async - // handle, so we will guaranteed wake up and see its message at some - // point. - // - // I have chosen to take route #2. - match self.message_queue.pop() { - msgq::Data(t) => Some(t), - msgq::Empty | msgq::Inconsistent => None - } - }; - - match msg { - Some(PinnedTask(task)) => { - let mut task = task; - task.give_home(HomeSched(self.make_handle())); - let (sched, task) = self.resume_task_immediately(stask, task); - (sched, task, true) - } - Some(TaskFromFriend(task)) => { - rtdebug!("got a task from a friend. lovely!"); - let (sched, task) = - self.process_task(stask, task, - Scheduler::resume_task_immediately_cl); - (sched, task, true) - } - Some(RunOnce(task)) => { - // bypass the process_task logic to force running this task once - // on this home scheduler. This is often used for I/O (homing). - let (sched, task) = self.resume_task_immediately(stask, task); - (sched, task, true) - } - Some(Wake) => { - self.sleepy = false; - (self, stask, true) - } - Some(Shutdown) => { - rtdebug!("shutting down"); - if self.sleepy { - // There may be an outstanding handle on the - // sleeper list. Pop them all to make sure that's - // not the case. - loop { - match self.sleeper_list.pop() { - Some(handle) => { - let mut handle = handle; - handle.send(Wake); - } - None => break - } - } - } - // No more sleeping. After there are no outstanding - // event loop references we will shut down. - self.no_sleep = true; - self.sleepy = false; - (self, stask, true) - } - Some(NewNeighbor(neighbor)) => { - self.work_queues.push(neighbor); - (self, stask, false) - } - None => (self, stask, false) - } - } - - fn do_work(mut self: Box, stask: Box) - -> (Box, Box, bool) { - rtdebug!("scheduler calling do work"); - match self.find_work() { - Some(task) => { - rtdebug!("found some work! running the task"); - let (sched, task) = - self.process_task(stask, task, - Scheduler::resume_task_immediately_cl); - (sched, task, true) - } - None => { - rtdebug!("no work was found, returning the scheduler struct"); - (self, stask, false) - } - } - } - - // Workstealing: In this iteration of the runtime each scheduler - // thread has a distinct work queue. When no work is available - // locally, make a few attempts to steal work from the queues of - // other scheduler threads. If a few steals fail we end up in the - // old "no work" path which is fine. - - // First step in the process is to find a task. This function does - // that by first checking the local queue, and if there is no work - // there, trying to steal from the remote work queues. - fn find_work(&mut self) -> Option> { - rtdebug!("scheduler looking for work"); - if !self.steal_for_yield { - match self.work_queue.pop() { - Some(task) => { - rtdebug!("found a task locally"); - return Some(task) - } - None => { - rtdebug!("scheduler trying to steal"); - return self.try_steals(); - } - } - } else { - // During execution of the last task, it performed a 'yield', - // so we're doing some work stealing in order to introduce some - // scheduling randomness. Otherwise we would just end up popping - // that same task again. This is pretty lame and is to work around - // the problem that work stealing is not designed for 'non-strict' - // (non-fork-join) task parallelism. - self.steal_for_yield = false; - match self.try_steals() { - Some(task) => { - rtdebug!("stole a task after yielding"); - return Some(task); - } - None => { - rtdebug!("did not steal a task after yielding"); - // Back to business - return self.find_work(); - } - } - } - } - - // Try stealing from all queues the scheduler knows about. This - // naive implementation can steal from our own queue or from other - // special schedulers. - fn try_steals(&mut self) -> Option> { - let work_queues = &mut self.work_queues; - let len = work_queues.len(); - let start_index = self.rng.gen_range(0, len); - for index in range(0, len).map(|i| (i + start_index) % len) { - match work_queues[index].steal() { - deque::Data(task) => { - rtdebug!("found task by stealing"); - return Some(task) - } - _ => () - } - }; - rtdebug!("giving up on stealing"); - return None; - } - - // * Task Routing Functions - Make sure tasks send up in the right - // place. - - fn process_task(mut self: Box, - cur: Box, - mut next: Box, - schedule_fn: SchedulingFn) - -> (Box, Box) { - rtdebug!("processing a task"); - - match next.take_unwrap_home() { - HomeSched(home_handle) => { - if home_handle.sched_id != self.sched_id() { - rtdebug!("sending task home"); - next.give_home(HomeSched(home_handle)); - Scheduler::send_task_home(next); - (self, cur) - } else { - rtdebug!("running task here"); - next.give_home(HomeSched(home_handle)); - schedule_fn(self, cur, next) - } - } - AnySched if self.run_anything => { - rtdebug!("running anysched task here"); - next.give_home(AnySched); - schedule_fn(self, cur, next) - } - AnySched => { - rtdebug!("sending task to friend"); - next.give_home(AnySched); - self.send_to_friend(next); - (self, cur) - } - } - } - - fn send_task_home(task: Box) { - let mut task = task; - match task.take_unwrap_home() { - HomeSched(mut home_handle) => home_handle.send(PinnedTask(task)), - AnySched => rtabort!("error: cannot send anysched task home"), - } - } - - /// Take a non-homed task we aren't allowed to run here and send - /// it to the designated friend scheduler to execute. - fn send_to_friend(&mut self, task: Box) { - rtdebug!("sending a task to friend"); - match self.friend_handle { - Some(ref mut handle) => { - handle.send(TaskFromFriend(task)); - } - None => { - rtabort!("tried to send task to a friend but scheduler has no friends"); - } - } - } - - /// Schedule a task to be executed later. - /// - /// Pushes the task onto the work stealing queue and tells the - /// event loop to run it later. Always use this instead of pushing - /// to the work queue directly. - pub fn enqueue_task(&mut self, task: Box) { - - // We push the task onto our local queue clone. - assert!(!task.is_sched()); - self.work_queue.push(task); - match self.idle_callback { - Some(ref mut idle) => idle.resume(), - None => {} // allow enqueuing before the scheduler starts - } - - // We've made work available. Notify a - // sleeping scheduler. - - match self.sleeper_list.casual_pop() { - Some(handle) => { - let mut handle = handle; - handle.send(Wake) - } - None => { (/* pass */) } - }; - } - - // * Core Context Switching Functions - - // The primary function for changing contexts. In the current - // design the scheduler is just a slightly modified GreenTask, so - // all context swaps are from GreenTask to GreenTask. The only difference - // between the various cases is where the inputs come from, and - // what is done with the resulting task. That is specified by the - // cleanup function f, which takes the scheduler and the - // old task as inputs. - - pub fn change_task_context(mut self: Box, - mut current_task: Box, - mut next_task: Box, - f: |&mut Scheduler, Box|) - -> Box { - let f_opaque = ClosureConverter::from_fn(f); - - let current_task_dupe = &mut *current_task as *mut GreenTask; - - // The current task is placed inside an enum with the cleanup - // function. This enum is then placed inside the scheduler. - self.cleanup_job = Some(CleanupJob::new(current_task, f_opaque)); - - // The scheduler is then placed inside the next task. - next_task.sched = Some(self); - - // However we still need an internal mutable pointer to the - // original task. The strategy here was "arrange memory, then - // get pointers", so we crawl back up the chain using - // transmute to eliminate borrowck errors. - unsafe { - - let sched: &mut Scheduler = - mem::transmute(&**next_task.sched.as_mut().unwrap()); - - let current_task: &mut GreenTask = match sched.cleanup_job { - Some(CleanupJob { ref mut task, .. }) => &mut **task, - None => rtabort!("no cleanup job") - }; - - let (current_task_context, next_task_context) = - Scheduler::get_contexts(current_task, &mut *next_task); - - // Done with everything - put the next task in TLS. This - // works because due to transmute the borrow checker - // believes that we have no internal pointers to - // next_task. - mem::forget(next_task); - - // The raw context swap operation. The next action taken - // will be running the cleanup job from the context of the - // next task. - Context::swap(current_task_context, next_task_context); - } - - // When the context swaps back to this task we immediately - // run the cleanup job, as expected by the previously called - // swap_contexts function. - let mut current_task: Box = unsafe { - mem::transmute(current_task_dupe) - }; - current_task.sched.as_mut().unwrap().run_cleanup_job(); - - // See the comments in switch_running_tasks_and_then for why a lock - // is acquired here. This is the resumption points and the "bounce" - // that it is referring to. - unsafe { - let _guard = current_task.nasty_deschedule_lock.lock(); - } - return current_task; - } - - // Returns a mutable reference to both contexts involved in this - // swap. This is unsafe - we are getting mutable internal - // references to keep even when we don't own the tasks. It looks - // kinda safe because we are doing transmutes before passing in - // the arguments. - pub fn get_contexts<'a>(current_task: &mut GreenTask, - next_task: &mut GreenTask) - -> (&'a mut Context, &'a mut Context) - { - let current_task_context = - &mut current_task.coroutine.as_mut().unwrap().saved_context; - let next_task_context = - &mut next_task.coroutine.as_mut().unwrap().saved_context; - unsafe { - (mem::transmute(current_task_context), - mem::transmute(next_task_context)) - } - } - - // * Context Swapping Helpers - Here be ugliness! - - pub fn resume_task_immediately(self: Box, - cur: Box, - next: Box) - -> (Box, Box) { - assert!(cur.is_sched()); - let mut cur = self.change_task_context(cur, next, |sched, stask| { - assert!(sched.sched_task.is_none()); - sched.sched_task = Some(stask); - }); - (cur.sched.take().unwrap(), cur) - } - - fn resume_task_immediately_cl(sched: Box, - cur: Box, - next: Box) - -> (Box, Box) { - sched.resume_task_immediately(cur, next) - } - - /// Block a running task, context switch to the scheduler, then pass the - /// blocked task to a closure. - /// - /// # Safety note - /// - /// The closure here is a *stack* closure that lives in the - /// running task. It gets transmuted to the scheduler's lifetime - /// and called while the task is blocked. - /// - /// This passes a Scheduler pointer to the fn after the context switch - /// in order to prevent that fn from performing further scheduling operations. - /// Doing further scheduling could easily result in infinite recursion. - /// - /// Note that if the closure provided relinquishes ownership of the - /// BlockedTask, then it is possible for the task to resume execution before - /// the closure has finished executing. This would naturally introduce a - /// race if the closure and task shared portions of the environment. - /// - /// This situation is currently prevented, or in other words it is - /// guaranteed that this function will not return before the given closure - /// has returned. - pub fn deschedule_running_task_and_then(mut self: Box, - cur: Box, - f: |&mut Scheduler, BlockedTask|) { - // Trickier - we need to get the scheduler task out of self - // and use it as the destination. - let stask = self.sched_task.take().unwrap(); - // Otherwise this is the same as below. - self.switch_running_tasks_and_then(cur, stask, f) - } - - pub fn switch_running_tasks_and_then(self: Box, - cur: Box, - next: Box, - f: |&mut Scheduler, BlockedTask|) { - // And here comes one of the sad moments in which a lock is used in a - // core portion of the rust runtime. As always, this is highly - // undesirable, so there's a good reason behind it. - // - // There is an excellent outline of the problem in issue #8132, and it's - // summarized in that `f` is executed on a sched task, but its - // environment is on the previous task. If `f` relinquishes ownership of - // the BlockedTask, then it may introduce a race where `f` is using the - // environment as well as the code after the 'deschedule' block. - // - // The solution we have chosen to adopt for now is to acquire a - // task-local lock around this block. The resumption of the task in - // context switching will bounce on the lock, thereby waiting for this - // block to finish, eliminating the race mentioned above. - // panic!("should never return!"); - // - // To actually maintain a handle to the lock, we use an unsafe pointer - // to it, but we're guaranteed that the task won't exit until we've - // unlocked the lock so there's no worry of this memory going away. - let cur = self.change_task_context(cur, next, |sched, mut task| { - let lock: *mut NativeMutex = &mut task.nasty_deschedule_lock; - unsafe { - let _guard = (*lock).lock(); - f(sched, BlockedTask::block(task.swap())); - } - }); - cur.put(); - } - - fn switch_task(sched: Box, - cur: Box, - next: Box) - -> (Box, Box) { - let mut cur = sched.change_task_context(cur, next, |sched, last_task| { - if last_task.is_sched() { - assert!(sched.sched_task.is_none()); - sched.sched_task = Some(last_task); - } else { - sched.enqueue_task(last_task); - } - }); - (cur.sched.take().unwrap(), cur) - } - - // * Task Context Helpers - - /// Called by a running task to end execution, after which it will - /// be recycled by the scheduler for reuse in a new task. - pub fn terminate_current_task(mut self: Box, - cur: Box) - -> ! { - // Similar to deschedule running task and then, but cannot go through - // the task-blocking path. The task is already dying. - let stask = self.sched_task.take().unwrap(); - let _cur = self.change_task_context(cur, stask, |sched, mut dead_task| { - let coroutine = dead_task.coroutine.take().unwrap(); - coroutine.recycle(&mut sched.stack_pool); - sched.task_state.decrement(); - }); - panic!("should never return!"); - } - - pub fn run_task(self: Box, - cur: Box, - next: Box) { - let (sched, task) = - self.process_task(cur, next, Scheduler::switch_task); - task.put_with_sched(sched); - } - - pub fn run_task_later(mut cur: Box, next: Box) { - let mut sched = cur.sched.take().unwrap(); - sched.enqueue_task(next); - cur.put_with_sched(sched); - } - - /// Yield control to the scheduler, executing another task. This is guaranteed - /// to introduce some amount of randomness to the scheduler. Currently the - /// randomness is a result of performing a round of work stealing (which - /// may end up stealing from the current scheduler). - pub fn yield_now(mut self: Box, cur: Box) { - // Async handles trigger the scheduler by calling yield_now on the local - // task, which eventually gets us to here. See comments in SchedRunner - // for more info on this. - if cur.is_sched() { - assert!(self.sched_task.is_none()); - self.run_sched_once(cur); - } else { - self.yield_check_count = reset_yield_check(&mut self.rng); - // Tell the scheduler to start stealing on the next iteration - self.steal_for_yield = true; - let stask = self.sched_task.take().unwrap(); - let cur = self.change_task_context(cur, stask, |sched, task| { - sched.enqueue_task(task); - }); - cur.put() - } - } - - pub fn maybe_yield(mut self: Box, cur: Box) { - // It's possible for sched tasks to possibly call this function, and it - // just means that they're likely sending on channels (which - // occasionally call this function). Sched tasks follow different paths - // when executing yield_now(), which may possibly trip the assertion - // below. For this reason, we just have sched tasks bail out soon. - // - // Sched tasks have no need to yield anyway because as soon as they - // return they'll yield to other threads by falling back to the event - // loop. Additionally, we completely control sched tasks, so we can make - // sure that they never execute more than enough code. - if cur.is_sched() { - return cur.put_with_sched(self) - } - - // The number of times to do the yield check before yielding, chosen - // arbitrarily. - rtassert!(self.yield_check_count > 0); - self.yield_check_count -= 1; - if self.yield_check_count == 0 { - self.yield_now(cur); - } else { - cur.put_with_sched(self); - } - } - - - // * Utility Functions - - pub fn sched_id(&self) -> uint { self as *const Scheduler as uint } - - pub fn run_cleanup_job(&mut self) { - let cleanup_job = self.cleanup_job.take().unwrap(); - cleanup_job.run(self) - } - - pub fn make_handle(&mut self) -> SchedHandle { - let remote = self.event_loop.remote_callback(box SchedRunner); - - return SchedHandle { - remote: remote, - queue: self.message_producer.clone(), - sched_id: self.sched_id() - } - } -} - -// Supporting types - -type SchedulingFn = fn(Box, Box, Box) - -> (Box, Box); - -pub enum SchedMessage { - Wake, - Shutdown, - NewNeighbor(deque::Stealer>), - PinnedTask(Box), - TaskFromFriend(Box), - RunOnce(Box), -} - -pub struct SchedHandle { - remote: Box, - queue: msgq::Producer, - pub sched_id: uint -} - -impl SchedHandle { - pub fn send(&mut self, msg: SchedMessage) { - self.queue.push(msg); - self.remote.fire(); - } -} - -struct SchedRunner; - -impl Callback for SchedRunner { - fn call(&mut self) { - // In theory, this function needs to invoke the `run_sched_once` - // function on the scheduler. Sadly, we have no context here, except for - // knowledge of the local `Task`. In order to avoid a call to - // `GreenTask::convert`, we just call `yield_now` and the scheduler will - // detect when a sched task performs a yield vs a green task performing - // a yield (and act accordingly). - // - // This function could be converted to `GreenTask::convert` if - // absolutely necessary, but for cleanliness it is much better to not - // use the conversion function. - let task: Box = Local::take(); - task.yield_now(); - } -} - -struct CleanupJob { - task: Box, - f: UnsafeTaskReceiver -} - -impl CleanupJob { - pub fn new(task: Box, f: UnsafeTaskReceiver) -> CleanupJob { - CleanupJob { - task: task, - f: f - } - } - - pub fn run(self, sched: &mut Scheduler) { - let CleanupJob { task, f } = self; - f.to_fn()(sched, task) - } -} - -// FIXME: Some hacks to put a || closure in Scheduler without borrowck -// complaining -type UnsafeTaskReceiver = raw::Closure; -trait ClosureConverter { - fn from_fn(|&mut Scheduler, Box|) -> Self; - fn to_fn(self) -> |&mut Scheduler, Box|:'static ; -} -impl ClosureConverter for UnsafeTaskReceiver { - fn from_fn(f: |&mut Scheduler, Box|) -> UnsafeTaskReceiver { - unsafe { mem::transmute(f) } - } - fn to_fn(self) -> |&mut Scheduler, Box|:'static { - unsafe { mem::transmute(self) } - } -} - -// On unix, we read randomness straight from /dev/urandom, but the -// default constructor of an XorShiftRng does this via io::fs, which -// relies on the scheduler existing, so we have to manually load -// randomness. Windows has its own C API for this, so we don't need to -// worry there. -#[cfg(windows)] -fn new_sched_rng() -> XorShiftRng { - use std::rand::OsRng; - match OsRng::new() { - Ok(mut r) => r.gen(), - Err(e) => { - rtabort!("sched: failed to create seeded RNG: {}", e) - } - } -} -#[cfg(unix)] -fn new_sched_rng() -> XorShiftRng { - use libc; - use std::mem; - use std::rand::SeedableRng; - - let fd = "/dev/urandom".with_c_str(|name| { - unsafe { libc::open(name, libc::O_RDONLY, 0) } - }); - if fd == -1 { - rtabort!("could not open /dev/urandom for reading.") - } - - let mut seeds = [0u32, .. 4]; - let size = mem::size_of_val(&seeds); - loop { - let nbytes = unsafe { - libc::read(fd, - seeds.as_mut_ptr() as *mut libc::c_void, - size as libc::size_t) - }; - rtassert!(nbytes as uint == size); - - if !seeds.iter().all(|x| *x == 0) { - break; - } - } - - unsafe {libc::close(fd);} - - SeedableRng::from_seed(seeds) -} - -#[cfg(test)] -mod test { - use std::rt::task::TaskOpts; - use std::rt::task::Task; - use std::rt::local::Local; - - use {TaskState, PoolConfig, SchedPool}; - use basic; - use sched::{TaskFromFriend, PinnedTask}; - use task::{GreenTask, HomeSched, AnySched}; - - fn pool() -> SchedPool { - SchedPool::new(PoolConfig { - threads: 1, - event_loop_factory: basic::event_loop, - }) - } - - fn run(f: proc():Send) { - let mut pool = pool(); - pool.spawn(TaskOpts::new(), f); - pool.shutdown(); - } - - fn sched_id() -> uint { - let mut task = Local::borrow(None::); - match task.maybe_take_runtime::() { - Some(green) => { - let ret = green.sched.as_ref().unwrap().sched_id(); - task.put_runtime(green); - return ret; - } - None => panic!() - } - } - - #[test] - fn trivial_run_in_newsched_task_test() { - let mut task_ran = false; - let task_ran_ptr: *mut bool = &mut task_ran; - run(proc() { - unsafe { *task_ran_ptr = true }; - rtdebug!("executed from the new scheduler") - }); - assert!(task_ran); - } - - #[test] - fn multiple_task_test() { - let total = 10; - let mut task_run_count = 0; - let task_run_count_ptr: *mut uint = &mut task_run_count; - // with only one thread this is safe to run in without worries of - // contention. - run(proc() { - for _ in range(0u, total) { - spawn(proc() { - unsafe { *task_run_count_ptr = *task_run_count_ptr + 1}; - }); - } - }); - assert!(task_run_count == total); - } - - #[test] - fn multiple_task_nested_test() { - let mut task_run_count = 0; - let task_run_count_ptr: *mut uint = &mut task_run_count; - run(proc() { - spawn(proc() { - unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; - spawn(proc() { - unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; - spawn(proc() { - unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; - }) - }) - }) - }); - assert!(task_run_count == 3); - } - - // A very simple test that confirms that a task executing on the - // home scheduler notices that it is home. - #[test] - fn test_home_sched() { - let mut pool = pool(); - - let (dtx, drx) = channel(); - { - let (tx, rx) = channel(); - let mut handle1 = pool.spawn_sched(); - let mut handle2 = pool.spawn_sched(); - - handle1.send(TaskFromFriend(pool.task(TaskOpts::new(), proc() { - tx.send(sched_id()); - }))); - let sched1_id = rx.recv(); - - let mut task = pool.task(TaskOpts::new(), proc() { - assert_eq!(sched_id(), sched1_id); - dtx.send(()); - }); - task.give_home(HomeSched(handle1)); - handle2.send(TaskFromFriend(task)); - } - drx.recv(); - - pool.shutdown(); - } - - // An advanced test that checks all four possible states that a - // (task,sched) can be in regarding homes. - - #[test] - fn test_schedule_home_states() { - use sleeper_list::SleeperList; - use super::{Shutdown, Scheduler, SchedHandle}; - use std::rt::thread::Thread; - use std::sync::deque::BufferPool; - - Thread::start(proc() { - let sleepers = SleeperList::new(); - let pool = BufferPool::new(); - let (normal_worker, normal_stealer) = pool.deque(); - let (special_worker, special_stealer) = pool.deque(); - let queues = vec![normal_stealer, special_stealer]; - let (_p, state) = TaskState::new(); - - // Our normal scheduler - let mut normal_sched = box Scheduler::new( - 1, - basic::event_loop(), - normal_worker, - queues.clone(), - sleepers.clone(), - state.clone()); - - let normal_handle = normal_sched.make_handle(); - let friend_handle = normal_sched.make_handle(); - - // Our special scheduler - let mut special_sched = box Scheduler::new_special( - 1, - basic::event_loop(), - special_worker, - queues.clone(), - sleepers.clone(), - false, - Some(friend_handle), - state); - - let special_handle = special_sched.make_handle(); - - let t1_handle = special_sched.make_handle(); - let t4_handle = special_sched.make_handle(); - - // Four test tasks: - // 1) task is home on special - // 2) task not homed, sched doesn't care - // 3) task not homed, sched requeues - // 4) task not home, send home - - // Grab both the scheduler and the task from TLS and check if the - // task is executing on an appropriate scheduler. - fn on_appropriate_sched() -> bool { - use task::{TypeGreen, TypeSched, HomeSched}; - let task = GreenTask::convert(Local::take()); - let sched_id = task.sched.as_ref().unwrap().sched_id(); - let run_any = task.sched.as_ref().unwrap().run_anything; - let ret = match task.task_type { - TypeGreen(Some(AnySched)) => { - run_any - } - TypeGreen(Some(HomeSched(SchedHandle { - sched_id: ref id, - .. - }))) => { - *id == sched_id - } - TypeGreen(None) => { panic!("task without home"); } - TypeSched => { panic!("expected green task"); } - }; - task.put(); - ret - } - - let task1 = GreenTask::new_homed(&mut special_sched.stack_pool, - None, HomeSched(t1_handle), proc() { - rtassert!(on_appropriate_sched()); - }); - - let task2 = GreenTask::new(&mut normal_sched.stack_pool, None, proc() { - rtassert!(on_appropriate_sched()); - }); - - let task3 = GreenTask::new(&mut normal_sched.stack_pool, None, proc() { - rtassert!(on_appropriate_sched()); - }); - - let task4 = GreenTask::new_homed(&mut special_sched.stack_pool, - None, HomeSched(t4_handle), proc() { - rtassert!(on_appropriate_sched()); - }); - - // Signal from the special task that we are done. - let (tx, rx) = channel::<()>(); - - fn run(next: Box) { - let mut task = GreenTask::convert(Local::take()); - let sched = task.sched.take().unwrap(); - sched.run_task(task, next) - } - - let normal_task = GreenTask::new(&mut normal_sched.stack_pool, None, proc() { - run(task2); - run(task4); - rx.recv(); - let mut nh = normal_handle; - nh.send(Shutdown); - let mut sh = special_handle; - sh.send(Shutdown); - }); - normal_sched.enqueue_task(normal_task); - - let special_task = GreenTask::new(&mut special_sched.stack_pool, None, proc() { - run(task1); - run(task3); - tx.send(()); - }); - special_sched.enqueue_task(special_task); - - let normal_sched = normal_sched; - let normal_thread = Thread::start(proc() { normal_sched.bootstrap() }); - - let special_sched = special_sched; - let special_thread = Thread::start(proc() { special_sched.bootstrap() }); - - normal_thread.join(); - special_thread.join(); - }).join(); - } - - //#[test] - //fn test_stress_schedule_task_states() { - // if util::limit_thread_creation_due_to_osx_and_valgrind() { return; } - // let n = stress_factor() * 120; - // for _ in range(0, n as int) { - // test_schedule_home_states(); - // } - //} - - #[test] - fn wakeup_across_scheds() { - let (tx1, rx1) = channel(); - let (tx2, rx2) = channel(); - - let mut pool1 = pool(); - let mut pool2 = pool(); - - pool1.spawn(TaskOpts::new(), proc() { - let id = sched_id(); - tx1.send(()); - rx2.recv(); - assert_eq!(id, sched_id()); - }); - - pool2.spawn(TaskOpts::new(), proc() { - let id = sched_id(); - rx1.recv(); - assert_eq!(id, sched_id()); - tx2.send(()); - }); - - pool1.shutdown(); - pool2.shutdown(); - } - - // A regression test that the final message is always handled. - // Used to deadlock because Shutdown was never recvd. - #[test] - fn no_missed_messages() { - let mut pool = pool(); - - let task = pool.task(TaskOpts::new(), proc()()); - pool.spawn_sched().send(TaskFromFriend(task)); - - pool.shutdown(); - } - - #[test] - fn multithreading() { - run(proc() { - let mut rxs = vec![]; - for _ in range(0u, 10) { - let (tx, rx) = channel(); - spawn(proc() { - tx.send(()); - }); - rxs.push(rx); - } - - loop { - match rxs.pop() { - Some(rx) => rx.recv(), - None => break, - } - } - }); - } - - #[test] - fn thread_ring() { - run(proc() { - let (end_tx, end_rx) = channel(); - - let n_tasks = 10; - let token = 2000; - - let (tx1, mut rx) = channel(); - tx1.send((token, end_tx)); - let mut i = 2; - while i <= n_tasks { - let (tx, next_rx) = channel(); - let imm_i = i; - let imm_rx = rx; - spawn(proc() { - roundtrip(imm_i, n_tasks, &imm_rx, &tx); - }); - rx = next_rx; - i += 1; - } - let rx = rx; - spawn(proc() { - roundtrip(1, n_tasks, &rx, &tx1); - }); - - end_rx.recv(); - }); - - fn roundtrip(id: int, n_tasks: int, - rx: &Receiver<(int, Sender<()>)>, - tx: &Sender<(int, Sender<()>)>) { - loop { - match rx.recv() { - (1, end_tx) => { - debug!("{}\n", id); - end_tx.send(()); - return; - } - (token, end_tx) => { - debug!("thread: {} got token: {}", id, token); - tx.send((token - 1, end_tx)); - if token <= n_tasks { - return; - } - } - } - } - } - } - - #[test] - fn start_closure_dtor() { - // Regression test that the `start` task entrypoint can - // contain dtors that use task resources - run(proc() { - #[allow(dead_code)] - struct S { field: () } - - impl Drop for S { - fn drop(&mut self) { - let _foo = box 0i; - } - } - - let s = S { field: () }; - - spawn(proc() { - let _ss = &s; - }); - }); - } - - #[test] - fn dont_starve_1() { - let mut pool = SchedPool::new(PoolConfig { - threads: 2, // this must be > 1 - event_loop_factory: basic::event_loop, - }); - pool.spawn(TaskOpts::new(), proc() { - let (tx, rx) = channel(); - - // This task should not be able to starve the sender; - // The sender should get stolen to another thread. - spawn(proc() { - while rx.try_recv().is_err() { } - }); - - tx.send(()); - }); - pool.shutdown(); - } - - #[test] - fn dont_starve_2() { - run(proc() { - let (tx1, rx1) = channel(); - let (tx2, _rx2) = channel(); - - // This task should not be able to starve the other task. - // The sends should eventually yield. - spawn(proc() { - while rx1.try_recv().is_err() { - tx2.send(()); - } - }); - - tx1.send(()); - }); - } - - // Regression test for a logic bug that would cause single-threaded - // schedulers to sleep forever after yielding and stealing another task. - #[test] - fn single_threaded_yield() { - use std::task::deschedule; - run(proc() { - for _ in range(0u, 5) { deschedule(); } - }); - } - - #[test] - fn test_spawn_sched_blocking() { - use std::rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; - static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT; - - // Testing that a task in one scheduler can block in foreign code - // without affecting other schedulers - for _ in range(0u, 20) { - let mut pool = pool(); - let (start_tx, start_rx) = channel(); - let (fin_tx, fin_rx) = channel(); - - let mut handle = pool.spawn_sched(); - handle.send(PinnedTask(pool.task(TaskOpts::new(), proc() { - unsafe { - let guard = LOCK.lock(); - - start_tx.send(()); - guard.wait(); // block the scheduler thread - guard.signal(); // let them know we have the lock - } - - fin_tx.send(()); - }))); - drop(handle); - - let mut handle = pool.spawn_sched(); - handle.send(PinnedTask(pool.task(TaskOpts::new(), proc() { - // Wait until the other task has its lock - start_rx.recv(); - - fn pingpong(po: &Receiver, ch: &Sender) { - let mut val = 20; - while val > 0 { - val = po.recv(); - let _ = ch.send_opt(val - 1); - } - } - - let (setup_tx, setup_rx) = channel(); - let (parent_tx, parent_rx) = channel(); - spawn(proc() { - let (child_tx, child_rx) = channel(); - setup_tx.send(child_tx); - pingpong(&child_rx, &parent_tx); - }); - - let child_tx = setup_rx.recv(); - child_tx.send(20); - pingpong(&parent_rx, &child_tx); - unsafe { - let guard = LOCK.lock(); - guard.signal(); // wakeup waiting scheduler - guard.wait(); // wait for them to grab the lock - } - }))); - drop(handle); - - fin_rx.recv(); - pool.shutdown(); - } - unsafe { LOCK.destroy(); } - } -} diff --git a/src/libgreen/simple.rs b/src/libgreen/simple.rs deleted file mode 100644 index e26a099c0282561f0a19628f1f0df3cbbdf504f3..0000000000000000000000000000000000000000 --- a/src/libgreen/simple.rs +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! A small module implementing a simple "runtime" used for bootstrapping a rust -//! scheduler pool and then interacting with it. - -use std::any::Any; -use std::mem; -use std::rt::Runtime; -use std::rt::local::Local; -use std::rt::mutex::NativeMutex; -use std::rt::task::{Task, BlockedTask, TaskOpts}; - -struct SimpleTask { - lock: NativeMutex, - awoken: bool, -} - -impl Runtime for SimpleTask { - // Implement the simple tasks of descheduling and rescheduling, but only in - // a simple number of cases. - fn deschedule(mut self: Box, - times: uint, - mut cur_task: Box, - f: |BlockedTask| -> Result<(), BlockedTask>) { - assert!(times == 1); - - let me = &mut *self as *mut SimpleTask; - let cur_dupe = &mut *cur_task as *mut Task; - cur_task.put_runtime(self); - let task = BlockedTask::block(cur_task); - - // See libnative/task.rs for what's going on here with the `awoken` - // field and the while loop around wait() - unsafe { - let guard = (*me).lock.lock(); - (*me).awoken = false; - match f(task) { - Ok(()) => { - while !(*me).awoken { - guard.wait(); - } - } - Err(task) => { mem::forget(task.wake()); } - } - drop(guard); - cur_task = mem::transmute(cur_dupe); - } - Local::put(cur_task); - } - fn reawaken(mut self: Box, mut to_wake: Box) { - let me = &mut *self as *mut SimpleTask; - to_wake.put_runtime(self); - unsafe { - mem::forget(to_wake); - let guard = (*me).lock.lock(); - (*me).awoken = true; - guard.signal(); - } - } - - // These functions are all unimplemented and panic as a result. This is on - // purpose. A "simple task" is just that, a very simple task that can't - // really do a whole lot. The only purpose of the task is to get us off our - // feet and running. - fn yield_now(self: Box, _cur_task: Box) { panic!() } - fn maybe_yield(self: Box, _cur_task: Box) { panic!() } - fn spawn_sibling(self: Box, - _cur_task: Box, - _opts: TaskOpts, - _f: proc():Send) { - panic!() - } - - fn stack_bounds(&self) -> (uint, uint) { panic!() } - fn stack_guard(&self) -> Option { panic!() } - - fn can_block(&self) -> bool { true } - fn wrap(self: Box) -> Box { panic!() } -} - -pub fn task() -> Box { - let mut task = box Task::new(); - task.put_runtime(box SimpleTask { - lock: unsafe {NativeMutex::new()}, - awoken: false, - }); - return task; -} diff --git a/src/libgreen/sleeper_list.rs b/src/libgreen/sleeper_list.rs deleted file mode 100644 index 5df866955e656101470378a26a3bcf5edb1f60b3..0000000000000000000000000000000000000000 --- a/src/libgreen/sleeper_list.rs +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Maintains a shared list of sleeping schedulers. Schedulers -//! use this to wake each other up. - -use std::sync::mpmc_bounded_queue::Queue; - -use sched::SchedHandle; - -pub struct SleeperList { - q: Queue, -} - -impl SleeperList { - pub fn new() -> SleeperList { - SleeperList{q: Queue::with_capacity(8*1024)} - } - - pub fn push(&mut self, value: SchedHandle) { - assert!(self.q.push(value)) - } - - pub fn pop(&mut self) -> Option { - self.q.pop() - } - - pub fn casual_pop(&mut self) -> Option { - self.q.pop() - } -} - -impl Clone for SleeperList { - fn clone(&self) -> SleeperList { - SleeperList { - q: self.q.clone() - } - } -} diff --git a/src/libgreen/stack.rs b/src/libgreen/stack.rs deleted file mode 100644 index 81e6152b3d7c3802499a7f9c11e8e8849b0af6f7..0000000000000000000000000000000000000000 --- a/src/libgreen/stack.rs +++ /dev/null @@ -1,215 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use std::ptr; -use std::sync::atomic; -use std::os::{errno, page_size, MemoryMap, MapReadable, MapWritable, - MapNonStandardFlags, getenv}; -use libc; - -/// A task's stack. The name "Stack" is a vestige of segmented stacks. -pub struct Stack { - buf: Option, - min_size: uint, - valgrind_id: libc::c_uint, -} - -// Try to use MAP_STACK on platforms that support it (it's what we're doing -// anyway), but some platforms don't support it at all. For example, it appears -// that there's a bug in freebsd that MAP_STACK implies MAP_FIXED (so it always -// panics): http://lists.freebsd.org/pipermail/freebsd-bugs/2011-July/044840.html -// -// DragonFly BSD also seems to suffer from the same problem. When MAP_STACK is -// used, it returns the same `ptr` multiple times. -#[cfg(not(any(windows, target_os = "freebsd", target_os = "dragonfly")))] -static STACK_FLAGS: libc::c_int = libc::MAP_STACK | libc::MAP_PRIVATE | - libc::MAP_ANON; -#[cfg(any(target_os = "freebsd", target_os = "dragonfly"))] -static STACK_FLAGS: libc::c_int = libc::MAP_PRIVATE | libc::MAP_ANON; -#[cfg(windows)] -static STACK_FLAGS: libc::c_int = 0; - -impl Stack { - /// Allocate a new stack of `size`. If size = 0, this will panic. Use - /// `dummy_stack` if you want a zero-sized stack. - pub fn new(size: uint) -> Stack { - // Map in a stack. Eventually we might be able to handle stack - // allocation failure, which would fail to spawn the task. But there's - // not many sensible things to do on OOM. Panic seems fine (and is - // what the old stack allocation did). - let stack = match MemoryMap::new(size, &[MapReadable, MapWritable, - MapNonStandardFlags(STACK_FLAGS)]) { - Ok(map) => map, - Err(e) => panic!("mmap for stack of size {} failed: {}", size, e) - }; - - // Change the last page to be inaccessible. This is to provide safety; - // when an FFI function overflows it will (hopefully) hit this guard - // page. It isn't guaranteed, but that's why FFI is unsafe. buf.data is - // guaranteed to be aligned properly. - if !protect_last_page(&stack) { - panic!("Could not memory-protect guard page. stack={}, errno={}", - stack.data(), errno()); - } - - let mut stk = Stack { - buf: Some(stack), - min_size: size, - valgrind_id: 0 - }; - - // FIXME: Using the FFI to call a C macro. Slow - stk.valgrind_id = unsafe { - rust_valgrind_stack_register(stk.start() as *const libc::uintptr_t, - stk.end() as *const libc::uintptr_t) - }; - return stk; - } - - /// Create a 0-length stack which starts (and ends) at 0. - pub unsafe fn dummy_stack() -> Stack { - Stack { - buf: None, - min_size: 0, - valgrind_id: 0 - } - } - - /// Point to the last writable byte of the stack - pub fn guard(&self) -> *const uint { - (self.start() as uint + page_size()) as *const uint - } - - /// Point to the low end of the allocated stack - pub fn start(&self) -> *const uint { - self.buf.as_ref().map(|m| m.data() as *const uint) - .unwrap_or(ptr::null()) - } - - /// Point one uint beyond the high end of the allocated stack - pub fn end(&self) -> *const uint { - self.buf.as_ref().map(|buf| unsafe { - buf.data().offset(buf.len() as int) as *const uint - }).unwrap_or(ptr::null()) - } -} - -#[cfg(unix)] -fn protect_last_page(stack: &MemoryMap) -> bool { - unsafe { - // This may seem backwards: the start of the segment is the last page? - // Yes! The stack grows from higher addresses (the end of the allocated - // block) to lower addresses (the start of the allocated block). - let last_page = stack.data() as *mut libc::c_void; - libc::mprotect(last_page, page_size() as libc::size_t, - libc::PROT_NONE) != -1 - } -} - -#[cfg(windows)] -fn protect_last_page(stack: &MemoryMap) -> bool { - unsafe { - // see above - let last_page = stack.data() as *mut libc::c_void; - let mut old_prot: libc::DWORD = 0; - libc::VirtualProtect(last_page, page_size() as libc::SIZE_T, - libc::PAGE_NOACCESS, - &mut old_prot as libc::LPDWORD) != 0 - } -} - -impl Drop for Stack { - fn drop(&mut self) { - unsafe { - // FIXME: Using the FFI to call a C macro. Slow - rust_valgrind_stack_deregister(self.valgrind_id); - } - } -} - -pub struct StackPool { - // Ideally this would be some data structure that preserved ordering on - // Stack.min_size. - stacks: Vec, -} - -impl StackPool { - pub fn new() -> StackPool { - StackPool { - stacks: vec![], - } - } - - pub fn take_stack(&mut self, min_size: uint) -> Stack { - // Ideally this would be a binary search - match self.stacks.iter().position(|s| min_size <= s.min_size) { - Some(idx) => self.stacks.swap_remove(idx).unwrap(), - None => Stack::new(min_size) - } - } - - pub fn give_stack(&mut self, stack: Stack) { - if self.stacks.len() <= max_cached_stacks() { - self.stacks.push(stack) - } - } -} - -fn max_cached_stacks() -> uint { - static AMT: atomic::AtomicUint = atomic::INIT_ATOMIC_UINT; - match AMT.load(atomic::SeqCst) { - 0 => {} - n => return n - 1, - } - let amt = getenv("RUST_MAX_CACHED_STACKS").and_then(|s| from_str(s.as_slice())); - // This default corresponds to 20M of cache per scheduler (at the - // default size). - let amt = amt.unwrap_or(10); - // 0 is our sentinel value, so ensure that we'll never see 0 after - // initialization has run - AMT.store(amt + 1, atomic::SeqCst); - return amt; -} - -extern { - fn rust_valgrind_stack_register(start: *const libc::uintptr_t, - end: *const libc::uintptr_t) -> libc::c_uint; - fn rust_valgrind_stack_deregister(id: libc::c_uint); -} - -#[cfg(test)] -mod tests { - use super::StackPool; - - #[test] - fn stack_pool_caches() { - let mut p = StackPool::new(); - let s = p.take_stack(10); - p.give_stack(s); - let s = p.take_stack(4); - assert_eq!(s.min_size, 10); - p.give_stack(s); - let s = p.take_stack(14); - assert_eq!(s.min_size, 14); - p.give_stack(s); - } - - #[test] - fn stack_pool_caches_exact() { - let mut p = StackPool::new(); - let mut s = p.take_stack(10); - s.valgrind_id = 100; - p.give_stack(s); - - let s = p.take_stack(10); - assert_eq!(s.min_size, 10); - assert_eq!(s.valgrind_id, 100); - } -} diff --git a/src/libgreen/task.rs b/src/libgreen/task.rs deleted file mode 100644 index e159c153bc38c73a576db94442011dc53574129d..0000000000000000000000000000000000000000 --- a/src/libgreen/task.rs +++ /dev/null @@ -1,602 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! The Green Task implementation -//! -//! This module contains the glue to the libstd runtime necessary to integrate -//! M:N scheduling. This GreenTask structure is hidden as a trait object in all -//! rust tasks and virtual calls are made in order to interface with it. -//! -//! Each green task contains a scheduler if it is currently running, and it also -//! contains the rust task itself in order to juggle around ownership of the -//! values. - -pub use self::TaskType::*; -pub use self::Home::*; - -use std::any::Any; -use std::mem; -use std::raw; -use std::rt::Runtime; -use std::rt::local::Local; -use std::rt::mutex::NativeMutex; -use std::rt::stack; -use std::rt::task::{Task, BlockedTask, TaskOpts}; -use std::rt; - -use context::Context; -use coroutine::Coroutine; -use sched::{Scheduler, SchedHandle, RunOnce}; -use stack::StackPool; - -/// The necessary fields needed to keep track of a green task (as opposed to a -/// 1:1 task). -pub struct GreenTask { - /// Coroutine that this task is running on, otherwise known as the register - /// context and the stack that this task owns. This field is optional to - /// relinquish ownership back to a scheduler to recycle stacks at a later - /// date. - pub coroutine: Option, - - /// Optional handle back into the home sched pool of this task. This field - /// is lazily initialized. - pub handle: Option, - - /// Slot for maintaining ownership of a scheduler. If a task is running, - /// this value will be Some(sched) where the task is running on "sched". - pub sched: Option>, - - /// Temporary ownership slot of a std::rt::task::Task object. This is used - /// to squirrel that libstd task away while we're performing green task - /// operations. - pub task: Option>, - - /// Dictates whether this is a sched task or a normal green task - pub task_type: TaskType, - - /// Home pool that this task was spawned into. This field is lazily - /// initialized until when the task is initially scheduled, and is used to - /// make sure that tasks are always woken up in the correct pool of - /// schedulers. - pub pool_id: uint, - - // See the comments in the scheduler about why this is necessary - pub nasty_deschedule_lock: NativeMutex, -} - -pub enum TaskType { - TypeGreen(Option), - TypeSched, -} - -pub enum Home { - AnySched, - HomeSched(SchedHandle), -} - -/// Trampoline code for all new green tasks which are running around. This -/// function is passed through to Context::new as the initial rust landing pad -/// for all green tasks. This code is actually called after the initial context -/// switch onto a green thread. -/// -/// The first argument to this function is the `Box` pointer, and -/// the next two arguments are the user-provided procedure for running code. -/// -/// The goal for having this weird-looking function is to reduce the number of -/// allocations done on a green-task startup as much as possible. -extern fn bootstrap_green_task(task: uint, code: *mut (), env: *mut ()) -> ! { - // Acquire ownership of the `proc()` - let start: proc() = unsafe { - mem::transmute(raw::Procedure { code: code, env: env }) - }; - - // Acquire ownership of the `Box` - let mut task: Box = unsafe { mem::transmute(task) }; - - // First code after swap to this new context. Run our cleanup job - task.pool_id = { - let sched = task.sched.as_mut().unwrap(); - sched.run_cleanup_job(); - sched.task_state.increment(); - sched.pool_id - }; - - // Convert our green task to a libstd task and then execute the code - // requested. This is the "try/catch" block for this green task and - // is the wrapper for *all* code run in the task. - let mut start = Some(start); - let task = task.swap().run(|| start.take().unwrap()()).destroy(); - - // Once the function has exited, it's time to run the termination - // routine. This means we need to context switch one more time but - // clean ourselves up on the other end. Since we have no way of - // preserving a handle to the GreenTask down to this point, this - // unfortunately must call `GreenTask::convert`. In order to avoid - // this we could add a `terminate` function to the `Runtime` trait - // in libstd, but that seems less appropriate since the conversion - // method exists. - GreenTask::convert(task).terminate(); -} - -impl GreenTask { - /// Creates a new green task which is not homed to any particular scheduler - /// and will not have any contained Task structure. - pub fn new(stack_pool: &mut StackPool, - stack_size: Option, - start: proc():Send) -> Box { - GreenTask::new_homed(stack_pool, stack_size, AnySched, start) - } - - /// Creates a new task (like `new`), but specifies the home for new task. - pub fn new_homed(stack_pool: &mut StackPool, - stack_size: Option, - home: Home, - start: proc():Send) -> Box { - // Allocate ourselves a GreenTask structure - let mut ops = GreenTask::new_typed(None, TypeGreen(Some(home))); - - // Allocate a stack for us to run on - let stack_size = stack_size.unwrap_or_else(|| rt::min_stack()); - let mut stack = stack_pool.take_stack(stack_size); - let context = Context::new(bootstrap_green_task, ops.as_uint(), start, - &mut stack); - - // Package everything up in a coroutine and return - ops.coroutine = Some(Coroutine { - current_stack_segment: stack, - saved_context: context, - }); - return ops; - } - - /// Creates a new green task with the specified coroutine and type, this is - /// useful when creating scheduler tasks. - pub fn new_typed(coroutine: Option, - task_type: TaskType) -> Box { - box GreenTask { - pool_id: 0, - coroutine: coroutine, - task_type: task_type, - sched: None, - handle: None, - nasty_deschedule_lock: unsafe { NativeMutex::new() }, - task: Some(box Task::new()), - } - } - - /// Creates a new green task with the given configuration options for the - /// contained Task object. The given stack pool is also used to allocate a - /// new stack for this task. - pub fn configure(pool: &mut StackPool, - opts: TaskOpts, - f: proc():Send) -> Box { - let TaskOpts { name, stack_size, on_exit } = opts; - - let mut green = GreenTask::new(pool, stack_size, f); - { - let task = green.task.as_mut().unwrap(); - task.name = name; - task.death.on_exit = on_exit; - } - return green; - } - - /// Just like the `maybe_take_runtime` function, this function should *not* - /// exist. Usage of this function is _strongly_ discouraged. This is an - /// absolute last resort necessary for converting a libstd task to a green - /// task. - /// - /// This function will assert that the task is indeed a green task before - /// returning (and will kill the entire process if this is wrong). - pub fn convert(mut task: Box) -> Box { - match task.maybe_take_runtime::() { - Some(mut green) => { - green.put_task(task); - green - } - None => rtabort!("not a green task any more?"), - } - } - - pub fn give_home(&mut self, new_home: Home) { - match self.task_type { - TypeGreen(ref mut home) => { *home = Some(new_home); } - TypeSched => rtabort!("type error: used SchedTask as GreenTask"), - } - } - - pub fn take_unwrap_home(&mut self) -> Home { - match self.task_type { - TypeGreen(ref mut home) => home.take().unwrap(), - TypeSched => rtabort!("type error: used SchedTask as GreenTask"), - } - } - - // New utility functions for homes. - - pub fn is_home_no_tls(&self, sched: &Scheduler) -> bool { - match self.task_type { - TypeGreen(Some(AnySched)) => { false } - TypeGreen(Some(HomeSched(SchedHandle { sched_id: ref id, .. }))) => { - *id == sched.sched_id() - } - TypeGreen(None) => { rtabort!("task without home"); } - TypeSched => { - // Awe yea - rtabort!("type error: expected: TypeGreen, found: TaskSched"); - } - } - } - - pub fn homed(&self) -> bool { - match self.task_type { - TypeGreen(Some(AnySched)) => { false } - TypeGreen(Some(HomeSched(SchedHandle { .. }))) => { true } - TypeGreen(None) => { - rtabort!("task without home"); - } - TypeSched => { - rtabort!("type error: expected: TypeGreen, found: TaskSched"); - } - } - } - - pub fn is_sched(&self) -> bool { - match self.task_type { - TypeGreen(..) => false, TypeSched => true, - } - } - - // Unsafe functions for transferring ownership of this GreenTask across - // context switches - - pub fn as_uint(&self) -> uint { - self as *const GreenTask as uint - } - - pub unsafe fn from_uint(val: uint) -> Box { - mem::transmute(val) - } - - // Runtime glue functions and helpers - - pub fn put_with_sched(mut self: Box, sched: Box) { - assert!(self.sched.is_none()); - self.sched = Some(sched); - self.put(); - } - - pub fn put_task(&mut self, task: Box) { - assert!(self.task.is_none()); - self.task = Some(task); - } - - pub fn swap(mut self: Box) -> Box { - let mut task = self.task.take().unwrap(); - task.put_runtime(self); - return task; - } - - pub fn put(self: Box) { - assert!(self.sched.is_some()); - Local::put(self.swap()); - } - - fn terminate(mut self: Box) -> ! { - let sched = self.sched.take().unwrap(); - sched.terminate_current_task(self) - } - - // This function is used to remotely wakeup this green task back on to its - // original pool of schedulers. In order to do so, each tasks arranges a - // SchedHandle upon descheduling to be available for sending itself back to - // the original pool. - // - // Note that there is an interesting transfer of ownership going on here. We - // must relinquish ownership of the green task, but then also send the task - // over the handle back to the original scheduler. In order to safely do - // this, we leverage the already-present "nasty descheduling lock". The - // reason for doing this is that each task will bounce on this lock after - // resuming after a context switch. By holding the lock over the enqueueing - // of the task, we're guaranteed that the SchedHandle's memory will be valid - // for this entire function. - // - // An alternative would include having incredibly cheaply cloneable handles, - // but right now a SchedHandle is something like 6 allocations, so it is - // *not* a cheap operation to clone a handle. Until the day comes that we - // need to optimize this, a lock should do just fine (it's completely - // uncontended except for when the task is rescheduled). - fn reawaken_remotely(mut self: Box) { - unsafe { - let mtx = &mut self.nasty_deschedule_lock as *mut NativeMutex; - let handle = self.handle.as_mut().unwrap() as *mut SchedHandle; - let _guard = (*mtx).lock(); - (*handle).send(RunOnce(self)); - } - } -} - -impl Runtime for GreenTask { - fn yield_now(mut self: Box, cur_task: Box) { - self.put_task(cur_task); - let sched = self.sched.take().unwrap(); - sched.yield_now(self); - } - - fn maybe_yield(mut self: Box, cur_task: Box) { - self.put_task(cur_task); - let sched = self.sched.take().unwrap(); - sched.maybe_yield(self); - } - - fn deschedule(mut self: Box, - times: uint, - cur_task: Box, - f: |BlockedTask| -> Result<(), BlockedTask>) { - self.put_task(cur_task); - let mut sched = self.sched.take().unwrap(); - - // In order for this task to be reawoken in all possible contexts, we - // may need a handle back in to the current scheduler. When we're woken - // up in anything other than the local scheduler pool, this handle is - // used to send this task back into the scheduler pool. - if self.handle.is_none() { - self.handle = Some(sched.make_handle()); - self.pool_id = sched.pool_id; - } - - // This code is pretty standard, except for the usage of - // `GreenTask::convert`. Right now if we use `reawaken` directly it will - // expect for there to be a task in local TLS, but that is not true for - // this deschedule block (because the scheduler must retain ownership of - // the task while the cleanup job is running). In order to get around - // this for now, we invoke the scheduler directly with the converted - // Task => GreenTask structure. - if times == 1 { - sched.deschedule_running_task_and_then(self, |sched, task| { - match f(task) { - Ok(()) => {} - Err(t) => { - t.wake().map(|t| { - sched.enqueue_task(GreenTask::convert(t)) - }); - } - } - }); - } else { - sched.deschedule_running_task_and_then(self, |sched, task| { - for task in task.make_selectable(times) { - match f(task) { - Ok(()) => {}, - Err(task) => { - task.wake().map(|t| { - sched.enqueue_task(GreenTask::convert(t)) - }); - break - } - } - } - }); - } - } - - fn reawaken(mut self: Box, to_wake: Box) { - self.put_task(to_wake); - assert!(self.sched.is_none()); - - // Optimistically look for a local task, but if one's not available to - // inspect (in order to see if it's in the same sched pool as we are), - // then just use our remote wakeup routine and carry on! - let mut running_task: Box = match Local::try_take() { - Some(task) => task, - None => return self.reawaken_remotely() - }; - - // Waking up a green thread is a bit of a tricky situation. We have no - // guarantee about where the current task is running. The options we - // have for where this current task is running are: - // - // 1. Our original scheduler pool - // 2. Some other scheduler pool - // 3. Something that isn't a scheduler pool - // - // In order to figure out what case we're in, this is the reason that - // the `maybe_take_runtime` function exists. Using this function we can - // dynamically check to see which of these cases is the current - // situation and then dispatch accordingly. - // - // In case 1, we just use the local scheduler to resume ourselves - // immediately (if a rescheduling is possible). - // - // In case 2 and 3, we need to remotely reawaken ourself in order to be - // transplanted back to the correct scheduler pool. - match running_task.maybe_take_runtime::() { - Some(mut running_green_task) => { - running_green_task.put_task(running_task); - let sched = running_green_task.sched.take().unwrap(); - - if sched.pool_id == self.pool_id { - sched.run_task(running_green_task, self); - } else { - self.reawaken_remotely(); - - // put that thing back where it came from! - running_green_task.put_with_sched(sched); - } - } - None => { - self.reawaken_remotely(); - Local::put(running_task); - } - } - } - - fn spawn_sibling(mut self: Box, - cur_task: Box, - opts: TaskOpts, - f: proc():Send) { - self.put_task(cur_task); - - // First, set up a bomb which when it goes off will restore the local - // task unless its disarmed. This will allow us to gracefully panic from - // inside of `configure` which allocates a new task. - struct Bomb { inner: Option> } - impl Drop for Bomb { - fn drop(&mut self) { - let _ = self.inner.take().map(|task| task.put()); - } - } - let mut bomb = Bomb { inner: Some(self) }; - - // Spawns a task into the current scheduler. We allocate the new task's - // stack from the scheduler's stack pool, and then configure it - // accordingly to `opts`. Afterwards we bootstrap it immediately by - // switching to it. - // - // Upon returning, our task is back in TLS and we're good to return. - let sibling = { - let sched = bomb.inner.as_mut().unwrap().sched.as_mut().unwrap(); - GreenTask::configure(&mut sched.stack_pool, opts, f) - }; - let mut me = bomb.inner.take().unwrap(); - let sched = me.sched.take().unwrap(); - sched.run_task(me, sibling) - } - - fn stack_bounds(&self) -> (uint, uint) { - let c = self.coroutine.as_ref() - .expect("GreenTask.stack_bounds called without a coroutine"); - - // Don't return the red zone as part of the usable stack of this task, - // it's essentially an implementation detail. - (c.current_stack_segment.start() as uint + stack::RED_ZONE, - c.current_stack_segment.end() as uint) - } - - fn stack_guard(&self) -> Option { - let c = self.coroutine.as_ref() - .expect("GreenTask.stack_guard called without a coroutine"); - - Some(c.current_stack_segment.guard() as uint) - } - - fn can_block(&self) -> bool { false } - - fn wrap(self: Box) -> Box { - self as Box - } -} - -#[cfg(test)] -mod tests { - use std::rt::local::Local; - use std::rt::task::Task; - use std::task; - use std::rt::task::TaskOpts; - - use super::super::{PoolConfig, SchedPool}; - use super::GreenTask; - - fn spawn_opts(opts: TaskOpts, f: proc():Send) { - let mut pool = SchedPool::new(PoolConfig { - threads: 1, - event_loop_factory: super::super::basic::event_loop, - }); - pool.spawn(opts, f); - pool.shutdown(); - } - - #[test] - fn smoke() { - let (tx, rx) = channel(); - spawn_opts(TaskOpts::new(), proc() { - tx.send(()); - }); - rx.recv(); - } - - #[test] - fn smoke_panic() { - let (tx, rx) = channel::(); - spawn_opts(TaskOpts::new(), proc() { - let _tx = tx; - panic!() - }); - assert_eq!(rx.recv_opt(), Err(())); - } - - #[test] - fn smoke_opts() { - let mut opts = TaskOpts::new(); - opts.name = Some("test".into_maybe_owned()); - opts.stack_size = Some(20 * 4096); - let (tx, rx) = channel(); - opts.on_exit = Some(proc(r) tx.send(r)); - spawn_opts(opts, proc() {}); - assert!(rx.recv().is_ok()); - } - - #[test] - fn smoke_opts_panic() { - let mut opts = TaskOpts::new(); - let (tx, rx) = channel(); - opts.on_exit = Some(proc(r) tx.send(r)); - spawn_opts(opts, proc() { panic!() }); - assert!(rx.recv().is_err()); - } - - #[test] - fn yield_test() { - let (tx, rx) = channel(); - spawn_opts(TaskOpts::new(), proc() { - for _ in range(0u, 10) { task::deschedule(); } - tx.send(()); - }); - rx.recv(); - } - - #[test] - fn spawn_children() { - let (tx1, rx) = channel(); - spawn_opts(TaskOpts::new(), proc() { - let (tx2, rx) = channel(); - spawn(proc() { - let (tx3, rx) = channel(); - spawn(proc() { - tx3.send(()); - }); - rx.recv(); - tx2.send(()); - }); - rx.recv(); - tx1.send(()); - }); - rx.recv(); - } - - #[test] - fn spawn_inherits() { - let (tx, rx) = channel(); - spawn_opts(TaskOpts::new(), proc() { - spawn(proc() { - let mut task: Box = Local::take(); - match task.maybe_take_runtime::() { - Some(ops) => { - task.put_runtime(ops); - } - None => panic!(), - } - Local::put(task); - tx.send(()); - }); - }); - rx.recv(); - } -}