提交 41dcec2f 编写于 作者: B bors

auto merge of #7265 : brson/rust/io-upstream, r=brson

r? @graydon, @nikomatsakis, @pcwalton, or @catamorphism

Sorry this is so huge, but it's been accumulating for about a month. There's lots of stuff here, mostly oriented toward enabling multithreaded scheduling and improving compatibility between the old and new runtimes. Adds task pinning so that we can create the 'platform thread' in servo.

[Here](https://github.com/brson/rust/blob/e1555f9b5628af2b6c6ed344cad621399cb7684d/src/libstd/rt/mod.rs#L201) is the current runtime setup code.

About half of this has already been reviewed.
......@@ -36,14 +36,6 @@
use std::uint;
use std::vec;
pub mod rustrt {
use std::libc::size_t;
#[abi = "cdecl"]
pub extern {
pub unsafe fn rust_sched_threads() -> size_t;
}
}
// The name of a test. By convention this follows the rules for rust
// paths; i.e. it should be a series of identifiers separated by double
......@@ -493,11 +485,10 @@ fn run_tests(opts: &TestOpts,
static SCHED_OVERCOMMIT : uint = 4u;
fn get_concurrency() -> uint {
unsafe {
let threads = rustrt::rust_sched_threads() as uint;
if threads == 1 { 1 }
else { threads * SCHED_OVERCOMMIT }
}
use std::rt;
let threads = rt::util::default_sched_threads();
if threads == 1 { 1 }
else { threads * SCHED_OVERCOMMIT }
}
#[allow(non_implicitly_copyable_typarams)]
......
......@@ -22,23 +22,6 @@
/// Code for dealing with @-vectors. This is pretty incomplete, and
/// contains a bunch of duplication from the code for ~-vectors.
pub mod rustrt {
use libc;
use vec;
#[cfg(stage0)]
use intrinsic::{TyDesc};
#[cfg(not(stage0))]
use unstable::intrinsics::{TyDesc};
#[abi = "cdecl"]
#[link_name = "rustrt"]
pub extern {
pub unsafe fn vec_reserve_shared_actual(t: *TyDesc,
v: **vec::raw::VecRepr,
n: libc::size_t);
}
}
/// Returns the number of elements the vector can hold without reallocating
#[inline]
pub fn capacity<T>(v: @[T]) -> uint {
......@@ -192,18 +175,17 @@ fn add(&self, rhs: &V) -> @[T] {
pub mod traits {}
pub mod raw {
use at_vec::{capacity, rustrt};
use at_vec::capacity;
use cast;
use cast::{transmute, transmute_copy};
use libc;
use ptr;
use sys;
use uint;
use unstable::intrinsics::{move_val_init};
use unstable::intrinsics;
use unstable::intrinsics::{move_val_init, TyDesc};
use vec;
#[cfg(stage0)]
use intrinsic::{get_tydesc};
#[cfg(not(stage0))]
use unstable::intrinsics::{get_tydesc};
use vec::UnboxedVecRepr;
pub type VecRepr = vec::raw::VecRepr;
pub type SliceRepr = vec::raw::SliceRepr;
......@@ -264,9 +246,49 @@ unsafe fn push_slow<T>(v: &mut @[T], initval: T) {
pub unsafe fn reserve<T>(v: &mut @[T], n: uint) {
// Only make the (slow) call into the runtime if we have to
if capacity(*v) < n {
let ptr: **VecRepr = transmute(v);
rustrt::vec_reserve_shared_actual(get_tydesc::<T>(),
ptr, n as libc::size_t);
let ptr: *mut *mut VecRepr = transmute(v);
let ty = intrinsics::get_tydesc::<T>();
// XXX transmute shouldn't be necessary
let ty = cast::transmute(ty);
return reserve_raw(ty, ptr, n);
}
}
// Implementation detail. Shouldn't be public
#[allow(missing_doc)]
pub fn reserve_raw(ty: *TyDesc, ptr: *mut *mut VecRepr, n: uint) {
unsafe {
let size_in_bytes = n * (*ty).size;
if size_in_bytes > (**ptr).unboxed.alloc {
let total_size = size_in_bytes + sys::size_of::<UnboxedVecRepr>();
// XXX: UnboxedVecRepr has an extra u8 at the end
let total_size = total_size - sys::size_of::<u8>();
(*ptr) = local_realloc(*ptr as *(), total_size) as *mut VecRepr;
(**ptr).unboxed.alloc = size_in_bytes;
}
}
fn local_realloc(ptr: *(), size: uint) -> *() {
use rt;
use rt::OldTaskContext;
use rt::local::Local;
use rt::task::Task;
if rt::context() == OldTaskContext {
unsafe {
return rust_local_realloc(ptr, size as libc::size_t);
}
extern {
#[fast_ffi]
fn rust_local_realloc(ptr: *(), size: libc::size_t) -> *();
}
} else {
do Local::borrow::<Task, *()> |task| {
task.heap.realloc(ptr as *libc::c_void, size) as *()
}
}
}
}
......
......@@ -10,105 +10,13 @@
#[doc(hidden)];
use libc::{c_char, intptr_t, uintptr_t};
use libc::c_void;
use ptr::{mut_null};
use repr::BoxRepr;
use cast::transmute;
use unstable::intrinsics::TyDesc;
#[cfg(not(test))] use unstable::lang::clear_task_borrow_list;
/**
* Runtime structures
*
* NB: These must match the representation in the C++ runtime.
*/
type TaskID = uintptr_t;
struct StackSegment { priv opaque: () }
struct Scheduler { priv opaque: () }
struct SchedulerLoop { priv opaque: () }
struct Kernel { priv opaque: () }
struct Env { priv opaque: () }
struct AllocHeader { priv opaque: () }
struct MemoryRegion { priv opaque: () }
#[cfg(target_arch="x86")]
struct Registers {
data: [u32, ..16]
}
#[cfg(target_arch="arm")]
#[cfg(target_arch="mips")]
struct Registers {
data: [u32, ..32]
}
#[cfg(target_arch="x86")]
#[cfg(target_arch="arm")]
#[cfg(target_arch="mips")]
struct Context {
regs: Registers,
next: *Context,
pad: [u32, ..3]
}
#[cfg(target_arch="x86_64")]
struct Registers {
data: [u64, ..22]
}
#[cfg(target_arch="x86_64")]
struct Context {
regs: Registers,
next: *Context,
pad: uintptr_t
}
struct BoxedRegion {
env: *Env,
backing_region: *MemoryRegion,
live_allocs: *BoxRepr
}
#[cfg(target_arch="x86")]
#[cfg(target_arch="arm")]
#[cfg(target_arch="mips")]
struct Task {
// Public fields
refcount: intptr_t, // 0
id: TaskID, // 4
pad: [u32, ..2], // 8
ctx: Context, // 16
stack_segment: *StackSegment, // 96
runtime_sp: uintptr_t, // 100
scheduler: *Scheduler, // 104
scheduler_loop: *SchedulerLoop, // 108
// Fields known only to the runtime
kernel: *Kernel, // 112
name: *c_char, // 116
list_index: i32, // 120
boxed_region: BoxedRegion // 128
}
#[cfg(target_arch="x86_64")]
struct Task {
// Public fields
refcount: intptr_t,
id: TaskID,
ctx: Context,
stack_segment: *StackSegment,
runtime_sp: uintptr_t,
scheduler: *Scheduler,
scheduler_loop: *SchedulerLoop,
// Fields known only to the runtime
kernel: *Kernel,
name: *c_char,
list_index: i32,
boxed_region: BoxedRegion
}
type DropGlue<'self> = &'self fn(**TyDesc, *c_void);
/*
* Box annihilation
......@@ -127,9 +35,9 @@ unsafe fn each_live_alloc(read_next_before: bool,
//! Walks the internal list of allocations
use managed;
use rt::local_heap;
let task: *Task = transmute(rustrt::rust_get_task());
let box = (*task).boxed_region.live_allocs;
let box = local_heap::live_allocs();
let mut box: *mut BoxRepr = transmute(copy box);
while box != mut_null() {
let next_before = transmute(copy (*box).header.next);
......@@ -151,7 +59,13 @@ unsafe fn each_live_alloc(read_next_before: bool,
#[cfg(unix)]
fn debug_mem() -> bool {
::rt::env::get().debug_mem
use rt;
use rt::OldTaskContext;
// XXX: Need to port the environment struct to newsched
match rt::context() {
OldTaskContext => ::rt::env::get().debug_mem,
_ => false
}
}
#[cfg(windows)]
......@@ -173,13 +87,12 @@ unsafe fn call_drop_glue(tydesc: *TyDesc, data: *i8) {
}
/// Destroys all managed memory (i.e. @ boxes) held by the current task.
#[cfg(not(test))]
#[lang="annihilate"]
pub unsafe fn annihilate() {
use unstable::lang::local_free;
use rt::local_heap::local_free;
use io::WriterUtil;
use io;
use libc;
use rt::borrowck;
use sys;
use managed;
......@@ -191,7 +104,7 @@ pub unsafe fn annihilate() {
// Quick hack: we need to free this list upon task exit, and this
// is a convenient place to do it.
clear_task_borrow_list();
borrowck::clear_task_borrow_list();
// Pass 1: Make all boxes immortal.
//
......@@ -213,7 +126,7 @@ pub unsafe fn annihilate() {
// callback, as the original value may have been freed.
for each_live_alloc(false) |box, uniq| {
if !uniq {
let tydesc = (*box).header.type_desc;
let tydesc: *TyDesc = transmute(copy (*box).header.type_desc);
let data = transmute(&(*box).data);
call_drop_glue(tydesc, data);
}
......
......@@ -220,7 +220,7 @@ fn peek(&self) -> bool {
/// A channel that can be shared between many senders.
pub struct SharedChan<T> {
ch: Exclusive<pipesy::Chan<T>>
inner: Either<Exclusive<pipesy::Chan<T>>, rtcomm::SharedChan<T>>
}
impl<T: Send> SharedChan<T> {
......@@ -228,40 +228,50 @@ impl<T: Send> SharedChan<T> {
pub fn new(c: Chan<T>) -> SharedChan<T> {
let Chan { inner } = c;
let c = match inner {
Left(c) => c,
Right(_) => fail!("SharedChan not implemented")
Left(c) => Left(exclusive(c)),
Right(c) => Right(rtcomm::SharedChan::new(c))
};
SharedChan { ch: exclusive(c) }
SharedChan { inner: c }
}
}
impl<T: Send> GenericChan<T> for SharedChan<T> {
fn send(&self, x: T) {
unsafe {
let mut xx = Some(x);
do self.ch.with_imm |chan| {
let x = replace(&mut xx, None);
chan.send(x.unwrap())
match self.inner {
Left(ref chan) => {
unsafe {
let mut xx = Some(x);
do chan.with_imm |chan| {
let x = replace(&mut xx, None);
chan.send(x.unwrap())
}
}
}
Right(ref chan) => chan.send(x)
}
}
}
impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
fn try_send(&self, x: T) -> bool {
unsafe {
let mut xx = Some(x);
do self.ch.with_imm |chan| {
let x = replace(&mut xx, None);
chan.try_send(x.unwrap())
match self.inner {
Left(ref chan) => {
unsafe {
let mut xx = Some(x);
do chan.with_imm |chan| {
let x = replace(&mut xx, None);
chan.try_send(x.unwrap())
}
}
}
Right(ref chan) => chan.try_send(x)
}
}
}
impl<T: Send> ::clone::Clone for SharedChan<T> {
fn clone(&self) -> SharedChan<T> {
SharedChan { ch: self.ch.clone() }
SharedChan { inner: self.inner.clone() }
}
}
......
......@@ -11,13 +11,20 @@
//! Logging
use option::*;
use os;
use either::*;
use rt;
use rt::OldTaskContext;
use rt::logging::{Logger, StdErrLogger};
/// Turns on logging to stdout globally
pub fn console_on() {
unsafe {
rustrt::rust_log_console_on();
if rt::context() == OldTaskContext {
unsafe {
rustrt::rust_log_console_on();
}
} else {
rt::logging::console_on();
}
}
......@@ -29,8 +36,17 @@ pub fn console_on() {
* the RUST_LOG environment variable
*/
pub fn console_off() {
unsafe {
rustrt::rust_log_console_off();
// If RUST_LOG is set then the console can't be turned off
if os::getenv("RUST_LOG").is_some() {
return;
}
if rt::context() == OldTaskContext {
unsafe {
rustrt::rust_log_console_off();
}
} else {
rt::logging::console_off();
}
}
......
......@@ -10,18 +10,16 @@
#[macro_escape];
macro_rules! rterrln (
($( $arg:expr),+) => ( {
::rt::util::dumb_println(fmt!( $($arg),+ ));
} )
)
// Some basic logging
macro_rules! rtdebug_ (
($( $arg:expr),+) => ( {
dumb_println(fmt!( $($arg),+ ));
fn dumb_println(s: &str) {
use io::WriterUtil;
let dbg = ::libc::STDERR_FILENO as ::io::fd_t;
dbg.write_str(s);
dbg.write_str("\n");
}
rterrln!( $($arg),+ )
} )
)
......@@ -33,21 +31,15 @@ fn dumb_println(s: &str) {
macro_rules! rtassert (
( $arg:expr ) => ( {
if !$arg {
abort!("assertion failed: %s", stringify!($arg));
rtabort!("assertion failed: %s", stringify!($arg));
}
} )
)
macro_rules! abort(
($( $msg:expr),+) => ( {
rtdebug!($($msg),+);
do_abort();
// NB: This is in a fn to avoid putting the `unsafe` block in a macro,
// which causes spurious 'unnecessary unsafe block' warnings.
fn do_abort() -> ! {
unsafe { ::libc::abort(); }
}
macro_rules! rtabort(
($( $msg:expr),+) => ( {
::rt::util::abort(fmt!($($msg),+));
} )
)
......@@ -741,6 +741,7 @@ unsafe fn get_list(p: &Path) -> ~[~str] {
as_utf16_p
};
use rt::global_heap::malloc_raw;
#[nolink]
extern {
unsafe fn rust_list_dir_wfd_size() -> libc::size_t;
......@@ -1134,8 +1135,15 @@ unsafe fn FormatMessageA(flags: DWORD, lpSrc: LPVOID,
* ignored and the process exits with the default failure status
*/
pub fn set_exit_status(code: int) {
unsafe {
rustrt::rust_set_exit_status(code as libc::intptr_t);
use rt;
use rt::OldTaskContext;
if rt::context() == OldTaskContext {
unsafe {
rustrt::rust_set_exit_status(code as libc::intptr_t);
}
} else {
rt::util::set_exit_status(code);
}
}
......@@ -1165,10 +1173,20 @@ pub fn real_args() -> ~[~str] {
#[cfg(target_os = "android")]
#[cfg(target_os = "freebsd")]
pub fn real_args() -> ~[~str] {
unsafe {
let argc = rustrt::rust_get_argc();
let argv = rustrt::rust_get_argv();
load_argc_and_argv(argc, argv)
use rt;
use rt::TaskContext;
if rt::context() == TaskContext {
match rt::args::clone() {
Some(args) => args,
None => fail!("process arguments not initialized")
}
} else {
unsafe {
let argc = rustrt::rust_get_argc();
let argv = rustrt::rust_get_argv();
load_argc_and_argv(argc, argv)
}
}
}
......
// Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Global storage for command line arguments
//!
//! The current incarnation of the Rust runtime expects for
//! the processes `argc` and `argv` arguments to be stored
//! in a globally-accessible location for use by the `os` module.
//!
//! XXX: Would be nice for this to not exist.
//! XXX: This has a lot of C glue for lack of globals.
use libc;
use option::{Option, Some, None};
use str;
use uint;
use unstable::finally::Finally;
use util;
/// One-time global initialization.
pub unsafe fn init(argc: int, argv: **u8) {
let args = load_argc_and_argv(argc, argv);
put(args);
}
/// One-time global cleanup.
pub fn cleanup() {
rtassert!(take().is_some());
}
/// Take the global arguments from global storage.
pub fn take() -> Option<~[~str]> {
with_lock(|| unsafe {
let ptr = get_global_ptr();
let val = util::replace(&mut *ptr, None);
val.map(|s: &~~[~str]| (**s).clone())
})
}
/// Give the global arguments to global storage.
///
/// It is an error if the arguments already exist.
pub fn put(args: ~[~str]) {
with_lock(|| unsafe {
let ptr = get_global_ptr();
rtassert!((*ptr).is_none());
(*ptr) = Some(~args.clone());
})
}
/// Make a clone of the global arguments.
pub fn clone() -> Option<~[~str]> {
with_lock(|| unsafe {
let ptr = get_global_ptr();
(*ptr).map(|s: &~~[~str]| (**s).clone())
})
}
fn with_lock<T>(f: &fn() -> T) -> T {
do (|| {
unsafe {
rust_take_global_args_lock();
f()
}
}).finally {
unsafe {
rust_drop_global_args_lock();
}
}
}
fn get_global_ptr() -> *mut Option<~~[~str]> {
unsafe { rust_get_global_args_ptr() }
}
// Copied from `os`.
unsafe fn load_argc_and_argv(argc: int, argv: **u8) -> ~[~str] {
let mut args = ~[];
for uint::range(0, argc as uint) |i| {
args.push(str::raw::from_c_str(*(argv as **libc::c_char).offset(i)));
}
return args;
}
extern {
fn rust_take_global_args_lock();
fn rust_drop_global_args_lock();
fn rust_get_global_args_ptr() -> *mut Option<~~[~str]>;
}
#[cfg(test)]
mod tests {
use option::{Some, None};
use super::*;
use unstable::finally::Finally;
#[test]
fn smoke_test() {
// Preserve the actual global state.
let saved_value = take();
let expected = ~[~"happy", ~"today?"];
put(expected.clone());
assert!(clone() == Some(expected.clone()));
assert!(take() == Some(expected.clone()));
assert!(take() == None);
do (|| {
}).finally {
// Restore the actual global state.
match saved_value {
Some(ref args) => put(args.clone()),
None => ()
}
}
}
}
// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use cast::transmute;
use libc::{c_char, c_void, size_t, STDERR_FILENO};
use io;
use io::{Writer, WriterUtil};
use managed::raw::BoxRepr;
use option::{Option, None, Some};
use uint;
use str;
use str::OwnedStr;
use sys;
use vec::ImmutableVector;
#[allow(non_camel_case_types)]
type rust_task = c_void;
pub static FROZEN_BIT: uint = 1 << (uint::bits - 1);
pub static MUT_BIT: uint = 1 << (uint::bits - 2);
static ALL_BITS: uint = FROZEN_BIT | MUT_BIT;
#[deriving(Eq)]
struct BorrowRecord {
box: *mut BoxRepr,
file: *c_char,
line: size_t
}
fn try_take_task_borrow_list() -> Option<~[BorrowRecord]> {
unsafe {
let cur_task: *rust_task = rust_try_get_task();
if cur_task.is_not_null() {
let ptr = rust_take_task_borrow_list(cur_task);
if ptr.is_null() {
None
} else {
let v: ~[BorrowRecord] = transmute(ptr);
Some(v)
}
} else {
None
}
}
}
fn swap_task_borrow_list(f: &fn(~[BorrowRecord]) -> ~[BorrowRecord]) {
unsafe {
let cur_task: *rust_task = rust_try_get_task();
if cur_task.is_not_null() {
let mut borrow_list: ~[BorrowRecord] = {
let ptr = rust_take_task_borrow_list(cur_task);
if ptr.is_null() { ~[] } else { transmute(ptr) }
};
borrow_list = f(borrow_list);
rust_set_task_borrow_list(cur_task, transmute(borrow_list));
}
}
}
pub unsafe fn clear_task_borrow_list() {
// pub because it is used by the box annihilator.
let _ = try_take_task_borrow_list();
}
unsafe fn fail_borrowed(box: *mut BoxRepr, file: *c_char, line: size_t) {
debug_borrow("fail_borrowed: ", box, 0, 0, file, line);
match try_take_task_borrow_list() {
None => { // not recording borrows
let msg = "borrowed";
do str::as_buf(msg) |msg_p, _| {
sys::begin_unwind_(msg_p as *c_char, file, line);
}
}
Some(borrow_list) => { // recording borrows
let mut msg = ~"borrowed";
let mut sep = " at ";
for borrow_list.rev_iter().advance |entry| {
if entry.box == box {
msg.push_str(sep);
let filename = str::raw::from_c_str(entry.file);
msg.push_str(filename);
msg.push_str(fmt!(":%u", entry.line as uint));
sep = " and at ";
}
}
do str::as_buf(msg) |msg_p, _| {
sys::begin_unwind_(msg_p as *c_char, file, line)
}
}
}
}
/// Because this code is so perf. sensitive, use a static constant so that
/// debug printouts are compiled out most of the time.
static ENABLE_DEBUG: bool = false;
#[inline]
unsafe fn debug_borrow<T>(tag: &'static str,
p: *const T,
old_bits: uint,
new_bits: uint,
filename: *c_char,
line: size_t) {
//! A useful debugging function that prints a pointer + tag + newline
//! without allocating memory.
if ENABLE_DEBUG && ::rt::env::get().debug_borrow {
debug_borrow_slow(tag, p, old_bits, new_bits, filename, line);
}
unsafe fn debug_borrow_slow<T>(tag: &'static str,
p: *const T,
old_bits: uint,
new_bits: uint,
filename: *c_char,
line: size_t) {
let dbg = STDERR_FILENO as io::fd_t;
dbg.write_str(tag);
dbg.write_hex(p as uint);
dbg.write_str(" ");
dbg.write_hex(old_bits);
dbg.write_str(" ");
dbg.write_hex(new_bits);
dbg.write_str(" ");
dbg.write_cstr(filename);
dbg.write_str(":");
dbg.write_hex(line as uint);
dbg.write_str("\n");
}
}
trait DebugPrints {
fn write_hex(&self, val: uint);
unsafe fn write_cstr(&self, str: *c_char);
}
impl DebugPrints for io::fd_t {
fn write_hex(&self, mut i: uint) {
let letters = ['0', '1', '2', '3', '4', '5', '6', '7', '8',
'9', 'a', 'b', 'c', 'd', 'e', 'f'];
static UINT_NIBBLES: uint = ::uint::bytes << 1;
let mut buffer = [0_u8, ..UINT_NIBBLES+1];
let mut c = UINT_NIBBLES;
while c > 0 {
c -= 1;
buffer[c] = letters[i & 0xF] as u8;
i >>= 4;
}
self.write(buffer.slice(0, UINT_NIBBLES));
}
unsafe fn write_cstr(&self, p: *c_char) {
use libc::strlen;
use vec;
let len = strlen(p);
let p: *u8 = transmute(p);
do vec::raw::buf_as_slice(p, len as uint) |s| {
self.write(s);
}
}
}
#[inline]
pub unsafe fn borrow_as_imm(a: *u8, file: *c_char, line: size_t) -> uint {
let a: *mut BoxRepr = transmute(a);
let old_ref_count = (*a).header.ref_count;
let new_ref_count = old_ref_count | FROZEN_BIT;
debug_borrow("borrow_as_imm:", a, old_ref_count, new_ref_count, file, line);
if (old_ref_count & MUT_BIT) != 0 {
fail_borrowed(a, file, line);
}
(*a).header.ref_count = new_ref_count;
old_ref_count
}
#[inline]
pub unsafe fn borrow_as_mut(a: *u8, file: *c_char, line: size_t) -> uint {
let a: *mut BoxRepr = transmute(a);
let old_ref_count = (*a).header.ref_count;
let new_ref_count = old_ref_count | MUT_BIT | FROZEN_BIT;
debug_borrow("borrow_as_mut:", a, old_ref_count, new_ref_count, file, line);
if (old_ref_count & (MUT_BIT|FROZEN_BIT)) != 0 {
fail_borrowed(a, file, line);
}
(*a).header.ref_count = new_ref_count;
old_ref_count
}
pub unsafe fn record_borrow(a: *u8, old_ref_count: uint,
file: *c_char, line: size_t) {
if (old_ref_count & ALL_BITS) == 0 {
// was not borrowed before
let a: *mut BoxRepr = transmute(a);
debug_borrow("record_borrow:", a, old_ref_count, 0, file, line);
do swap_task_borrow_list |borrow_list| {
let mut borrow_list = borrow_list;
borrow_list.push(BorrowRecord {box: a, file: file, line: line});
borrow_list
}
}
}
pub unsafe fn unrecord_borrow(a: *u8, old_ref_count: uint,
file: *c_char, line: size_t) {
if (old_ref_count & ALL_BITS) == 0 {
// was not borrowed before, so we should find the record at
// the end of the list
let a: *mut BoxRepr = transmute(a);
debug_borrow("unrecord_borrow:", a, old_ref_count, 0, file, line);
do swap_task_borrow_list |borrow_list| {
let mut borrow_list = borrow_list;
assert!(!borrow_list.is_empty());
let br = borrow_list.pop();
if br.box != a || br.file != file || br.line != line {
let err = fmt!("wrong borrow found, br=%?", br);
do str::as_buf(err) |msg_p, _| {
sys::begin_unwind_(msg_p as *c_char, file, line)
}
}
borrow_list
}
}
}
#[inline]
pub unsafe fn return_to_mut(a: *u8, orig_ref_count: uint,
file: *c_char, line: size_t) {
// Sometimes the box is null, if it is conditionally frozen.
// See e.g. #4904.
if !a.is_null() {
let a: *mut BoxRepr = transmute(a);
let old_ref_count = (*a).header.ref_count;
let new_ref_count =
(old_ref_count & !ALL_BITS) | (orig_ref_count & ALL_BITS);
debug_borrow("return_to_mut:",
a, old_ref_count, new_ref_count, file, line);
(*a).header.ref_count = new_ref_count;
}
}
#[inline]
pub unsafe fn check_not_borrowed(a: *u8,
file: *c_char,
line: size_t) {
let a: *mut BoxRepr = transmute(a);
let ref_count = (*a).header.ref_count;
debug_borrow("check_not_borrowed:", a, ref_count, 0, file, line);
if (ref_count & FROZEN_BIT) != 0 {
fail_borrowed(a, file, line);
}
}
extern {
#[rust_stack]
pub fn rust_take_task_borrow_list(task: *rust_task) -> *c_void;
#[rust_stack]
pub fn rust_set_task_borrow_list(task: *rust_task, map: *c_void);
#[rust_stack]
pub fn rust_try_get_task() -> *rust_task;
}
......@@ -19,13 +19,16 @@
use cast;
use util;
use ops::Drop;
use rt::task::Task;
use kinds::Send;
use rt::sched::{Scheduler, Coroutine};
use rt::sched::Scheduler;
use rt::local::Local;
use unstable::intrinsics::{atomic_xchg, atomic_load};
use unstable::atomics::{AtomicUint, AtomicOption, SeqCst};
use unstable::sync::UnsafeAtomicRcBox;
use util::Void;
use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
use cell::Cell;
use clone::Clone;
/// A combined refcount / ~Task pointer.
///
......@@ -34,14 +37,14 @@
/// * 2 - both endpoints are alive
/// * 1 - either the sender or the receiver is dead, determined by context
/// * <ptr> - A pointer to a blocked Task that can be transmuted to ~Task
type State = int;
type State = uint;
static STATE_BOTH: State = 2;
static STATE_ONE: State = 1;
/// The heap-allocated structure shared between two endpoints.
struct Packet<T> {
state: State,
state: AtomicUint,
payload: Option<T>,
}
......@@ -70,7 +73,7 @@ pub struct PortOneHack<T> {
pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
let packet: ~Packet<T> = ~Packet {
state: STATE_BOTH,
state: AtomicUint::new(STATE_BOTH),
payload: None
};
......@@ -114,20 +117,30 @@ pub fn try_send(self, val: T) -> bool {
// reordering of the payload write. This also issues an
// acquire barrier that keeps the subsequent access of the
// ~Task pointer from being reordered.
let oldstate = atomic_xchg(&mut (*packet).state, STATE_ONE);
let oldstate = (*packet).state.swap(STATE_ONE, SeqCst);
match oldstate {
STATE_BOTH => {
// Port is not waiting yet. Nothing to do
do Local::borrow::<Scheduler, ()> |sched| {
rtdebug!("non-rendezvous send");
sched.metrics.non_rendezvous_sends += 1;
}
}
STATE_ONE => {
do Local::borrow::<Scheduler, ()> |sched| {
rtdebug!("rendezvous send");
sched.metrics.rendezvous_sends += 1;
}
// Port has closed. Need to clean up.
let _packet: ~Packet<T> = cast::transmute(this.inner.void_packet);
recvr_active = false;
}
task_as_state => {
// Port is blocked. Wake it up.
let recvr: ~Coroutine = cast::transmute(task_as_state);
let sched = Local::take::<Scheduler>();
let recvr: ~Task = cast::transmute(task_as_state);
let mut sched = Local::take::<Scheduler>();
rtdebug!("rendezvous send");
sched.metrics.rendezvous_sends += 1;
sched.schedule_task(recvr);
}
}
......@@ -158,23 +171,30 @@ pub fn try_recv(self) -> Option<T> {
// Switch to the scheduler to put the ~Task into the Packet state.
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| {
do sched.deschedule_running_task_and_then |sched, task| {
unsafe {
// Atomically swap the task pointer into the Packet state, issuing
// an acquire barrier to prevent reordering of the subsequent read
// of the payload. Also issues a release barrier to prevent reordering
// of any previous writes to the task structure.
let task_as_state: State = cast::transmute(task);
let oldstate = atomic_xchg(&mut (*packet).state, task_as_state);
let oldstate = (*packet).state.swap(task_as_state, SeqCst);
match oldstate {
STATE_BOTH => {
// Data has not been sent. Now we're blocked.
rtdebug!("non-rendezvous recv");
sched.metrics.non_rendezvous_recvs += 1;
}
STATE_ONE => {
rtdebug!("rendezvous recv");
sched.metrics.rendezvous_recvs += 1;
// Channel is closed. Switch back and check the data.
let task: ~Coroutine = cast::transmute(task_as_state);
let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task);
// NB: We have to drop back into the scheduler event loop here
// instead of switching immediately back or we could end up
// triggering infinite recursion on the scheduler's stack.
let task: ~Task = cast::transmute(task_as_state);
sched.enqueue_task(task);
}
_ => util::unreachable()
}
......@@ -210,7 +230,7 @@ impl<T> Peekable<T> for PortOne<T> {
fn peek(&self) -> bool {
unsafe {
let packet: *mut Packet<T> = self.inner.packet();
let oldstate = atomic_load(&mut (*packet).state);
let oldstate = (*packet).state.load(SeqCst);
match oldstate {
STATE_BOTH => false,
STATE_ONE => (*packet).payload.is_some(),
......@@ -227,7 +247,7 @@ fn drop(&self) {
unsafe {
let this = cast::transmute_mut(self);
let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE);
let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
match oldstate {
STATE_BOTH => {
// Port still active. It will destroy the Packet.
......@@ -238,7 +258,7 @@ fn drop(&self) {
task_as_state => {
// The port is blocked waiting for a message we will never send. Wake it.
assert!((*this.packet()).payload.is_none());
let recvr: ~Coroutine = cast::transmute(task_as_state);
let recvr: ~Task = cast::transmute(task_as_state);
let sched = Local::take::<Scheduler>();
sched.schedule_task(recvr);
}
......@@ -254,7 +274,7 @@ fn drop(&self) {
unsafe {
let this = cast::transmute_mut(self);
let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE);
let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
match oldstate {
STATE_BOTH => {
// Chan still active. It will destroy the packet.
......@@ -295,16 +315,19 @@ struct StreamPayload<T> {
next: PortOne<StreamPayload<T>>
}
type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
type StreamPortOne<T> = PortOne<StreamPayload<T>>;
/// A channel with unbounded size.
pub struct Chan<T> {
// FIXME #5372. Using Cell because we don't take &mut self
next: Cell<ChanOne<StreamPayload<T>>>
next: Cell<StreamChanOne<T>>
}
/// An port with unbounded size.
pub struct Port<T> {
// FIXME #5372. Using Cell because we don't take &mut self
next: Cell<PortOne<StreamPayload<T>>>
next: Cell<StreamPortOne<T>>
}
pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
......@@ -357,6 +380,136 @@ fn peek(&self) -> bool {
}
}
pub struct SharedChan<T> {
// Just like Chan, but a shared AtomicOption instead of Cell
priv next: UnsafeAtomicRcBox<AtomicOption<StreamChanOne<T>>>
}
impl<T> SharedChan<T> {
pub fn new(chan: Chan<T>) -> SharedChan<T> {
let next = chan.next.take();
let next = AtomicOption::new(~next);
SharedChan { next: UnsafeAtomicRcBox::new(next) }
}
}
impl<T: Send> GenericChan<T> for SharedChan<T> {
fn send(&self, val: T) {
self.try_send(val);
}
}
impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
fn try_send(&self, val: T) -> bool {
unsafe {
let (next_pone, next_cone) = oneshot();
let cone = (*self.next.get()).swap(~next_cone, SeqCst);
cone.unwrap().try_send(StreamPayload { val: val, next: next_pone })
}
}
}
impl<T> Clone for SharedChan<T> {
fn clone(&self) -> SharedChan<T> {
SharedChan {
next: self.next.clone()
}
}
}
pub struct SharedPort<T> {
// The next port on which we will receive the next port on which we will receive T
priv next_link: UnsafeAtomicRcBox<AtomicOption<PortOne<StreamPortOne<T>>>>
}
impl<T> SharedPort<T> {
pub fn new(port: Port<T>) -> SharedPort<T> {
// Put the data port into a new link pipe
let next_data_port = port.next.take();
let (next_link_port, next_link_chan) = oneshot();
next_link_chan.send(next_data_port);
let next_link = AtomicOption::new(~next_link_port);
SharedPort { next_link: UnsafeAtomicRcBox::new(next_link) }
}
}
impl<T: Send> GenericPort<T> for SharedPort<T> {
fn recv(&self) -> T {
match self.try_recv() {
Some(val) => val,
None => {
fail!("receiving on a closed channel");
}
}
}
fn try_recv(&self) -> Option<T> {
unsafe {
let (next_link_port, next_link_chan) = oneshot();
let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst);
let link_port = link_port.unwrap();
let data_port = link_port.recv();
let (next_data_port, res) = match data_port.try_recv() {
Some(StreamPayload { val, next }) => {
(next, Some(val))
}
None => {
let (next_data_port, _) = oneshot();
(next_data_port, None)
}
};
next_link_chan.send(next_data_port);
return res;
}
}
}
impl<T> Clone for SharedPort<T> {
fn clone(&self) -> SharedPort<T> {
SharedPort {
next_link: self.next_link.clone()
}
}
}
// XXX: Need better name
type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
pub fn megapipe<T: Send>() -> MegaPipe<T> {
let (port, chan) = stream();
(SharedPort::new(port), SharedChan::new(chan))
}
impl<T: Send> GenericChan<T> for MegaPipe<T> {
fn send(&self, val: T) {
match *self {
(_, ref c) => c.send(val)
}
}
}
impl<T: Send> GenericSmartChan<T> for MegaPipe<T> {
fn try_send(&self, val: T) -> bool {
match *self {
(_, ref c) => c.try_send(val)
}
}
}
impl<T: Send> GenericPort<T> for MegaPipe<T> {
fn recv(&self) -> T {
match *self {
(ref p, _) => p.recv()
}
}
fn try_recv(&self) -> Option<T> {
match *self {
(ref p, _) => p.try_recv()
}
}
}
#[cfg(test)]
mod test {
use super::*;
......@@ -402,6 +555,8 @@ fn oneshot_single_thread_recv_chan_close() {
{ let _c = chan; }
port.recv();
};
// What is our res?
rtdebug!("res is: %?", res.is_err());
assert!(res.is_err());
}
}
......@@ -584,7 +739,7 @@ fn oneshot_multi_thread_send_recv_stress() {
#[test]
fn stream_send_recv_stress() {
for stress_factor().times {
do run_in_newsched_task {
do run_in_mt_newsched_task {
let (port, chan) = stream::<~int>();
send(chan, 0);
......@@ -594,18 +749,18 @@ fn send(chan: Chan<~int>, i: int) {
if i == 10 { return }
let chan_cell = Cell::new(chan);
let _thread = do spawntask_thread {
do spawntask_random {
let chan = chan_cell.take();
chan.send(~i);
send(chan, i + 1);
};
}
}
fn recv(port: Port<~int>, i: int) {
if i == 10 { return }
let port_cell = Cell::new(port);
let _thread = do spawntask_thread {
do spawntask_random {
let port = port_cell.take();
assert!(port.recv() == ~i);
recv(port, i + 1);
......@@ -614,4 +769,144 @@ fn recv(port: Port<~int>, i: int) {
}
}
}
#[test]
fn recv_a_lot() {
// Regression test that we don't run out of stack in scheduler context
do run_in_newsched_task {
let (port, chan) = stream();
for 10000.times { chan.send(()) }
for 10000.times { port.recv() }
}
}
#[test]
fn shared_chan_stress() {
do run_in_mt_newsched_task {
let (port, chan) = stream();
let chan = SharedChan::new(chan);
let total = stress_factor() + 100;
for total.times {
let chan_clone = chan.clone();
do spawntask_random {
chan_clone.send(());
}
}
for total.times {
port.recv();
}
}
}
#[test]
fn shared_port_stress() {
do run_in_mt_newsched_task {
// XXX: Removing these type annotations causes an ICE
let (end_port, end_chan) = stream::<()>();
let (port, chan) = stream::<()>();
let end_chan = SharedChan::new(end_chan);
let port = SharedPort::new(port);
let total = stress_factor() + 100;
for total.times {
let end_chan_clone = end_chan.clone();
let port_clone = port.clone();
do spawntask_random {
port_clone.recv();
end_chan_clone.send(());
}
}
for total.times {
chan.send(());
}
for total.times {
end_port.recv();
}
}
}
#[test]
fn shared_port_close_simple() {
do run_in_mt_newsched_task {
let (port, chan) = stream::<()>();
let port = SharedPort::new(port);
{ let _chan = chan; }
assert!(port.try_recv().is_none());
}
}
#[test]
fn shared_port_close() {
do run_in_mt_newsched_task {
let (end_port, end_chan) = stream::<bool>();
let (port, chan) = stream::<()>();
let end_chan = SharedChan::new(end_chan);
let port = SharedPort::new(port);
let chan = SharedChan::new(chan);
let send_total = 10;
let recv_total = 20;
do spawntask_random {
for send_total.times {
let chan_clone = chan.clone();
do spawntask_random {
chan_clone.send(());
}
}
}
let end_chan_clone = end_chan.clone();
do spawntask_random {
for recv_total.times {
let port_clone = port.clone();
let end_chan_clone = end_chan_clone.clone();
do spawntask_random {
let recvd = port_clone.try_recv().is_some();
end_chan_clone.send(recvd);
}
}
}
let mut recvd = 0;
for recv_total.times {
recvd += if end_port.recv() { 1 } else { 0 };
}
assert!(recvd == send_total);
}
}
#[test]
fn megapipe_stress() {
use rand;
use rand::RngUtil;
do run_in_mt_newsched_task {
let (end_port, end_chan) = stream::<()>();
let end_chan = SharedChan::new(end_chan);
let pipe = megapipe();
let total = stress_factor() + 10;
let mut rng = rand::rng();
for total.times {
let msgs = rng.gen_uint_range(0, 10);
let pipe_clone = pipe.clone();
let end_chan_clone = end_chan.clone();
do spawntask_random {
for msgs.times {
pipe_clone.send(());
}
for msgs.times {
pipe_clone.recv();
}
}
end_chan_clone.send(());
}
for total.times {
end_port.recv();
}
}
}
}
......@@ -8,7 +8,7 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use libc::{c_char, c_void, size_t, uintptr_t, free, malloc, realloc};
use libc::{c_void, c_char, size_t, uintptr_t, free, malloc, realloc};
use managed::raw::{BoxHeaderRepr, BoxRepr};
use unstable::intrinsics::TyDesc;
use sys::size_of;
......@@ -95,6 +95,11 @@ pub unsafe fn vector_exchange_malloc(align: u32, size: uintptr_t) -> *c_char {
// FIXME: #7496
#[cfg(not(test))]
#[lang="closure_exchange_malloc"]
#[inline]
pub unsafe fn closure_exchange_malloc_(td: *c_char, size: uintptr_t) -> *c_char {
closure_exchange_malloc(td, size)
}
#[inline]
pub unsafe fn closure_exchange_malloc(td: *c_char, size: uintptr_t) -> *c_char {
let td = td as *TyDesc;
......@@ -115,6 +120,11 @@ pub unsafe fn closure_exchange_malloc(td: *c_char, size: uintptr_t) -> *c_char {
// inside a landing pad may corrupt the state of the exception handler.
#[cfg(not(test))]
#[lang="exchange_free"]
#[inline]
pub unsafe fn exchange_free_(ptr: *c_char) {
exchange_free(ptr)
}
#[inline]
pub unsafe fn exchange_free(ptr: *c_char) {
free(ptr as *c_void);
......
......@@ -8,7 +8,10 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
type Port = u16;
#[deriving(Eq, TotalEq)]
pub enum IpAddr {
Ipv4(u8, u8, u8, u8, u16),
Ipv6
Ipv4(u8, u8, u8, u8, Port),
Ipv6(u16, u16, u16, u16, u16, u16, u16, u16, Port)
}
......@@ -18,15 +18,11 @@
RtioTcpStream, RtioTcpStreamObject};
use rt::local::Local;
pub struct TcpStream {
rtstream: ~RtioTcpStreamObject
}
pub struct TcpStream(~RtioTcpStreamObject);
impl TcpStream {
fn new(s: ~RtioTcpStreamObject) -> TcpStream {
TcpStream {
rtstream: s
}
TcpStream(s)
}
pub fn connect(addr: IpAddr) -> Option<TcpStream> {
......@@ -38,13 +34,11 @@ pub fn connect(addr: IpAddr) -> Option<TcpStream> {
};
match stream {
Ok(s) => {
Some(TcpStream::new(s))
}
Ok(s) => Some(TcpStream::new(s)),
Err(ioerr) => {
rtdebug!("failed to connect: %?", ioerr);
io_error::cond.raise(ioerr);
return None;
None
}
}
}
......@@ -52,8 +46,7 @@ pub fn connect(addr: IpAddr) -> Option<TcpStream> {
impl Reader for TcpStream {
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
let bytes_read = self.rtstream.read(buf);
match bytes_read {
match (**self).read(buf) {
Ok(read) => Some(read),
Err(ioerr) => {
// EOF is indicated by returning None
......@@ -70,8 +63,7 @@ fn eof(&mut self) -> bool { fail!() }
impl Writer for TcpStream {
fn write(&mut self, buf: &[u8]) {
let res = self.rtstream.write(buf);
match res {
match (**self).write(buf) {
Ok(_) => (),
Err(ioerr) => {
io_error::cond.raise(ioerr);
......@@ -82,9 +74,7 @@ fn write(&mut self, buf: &[u8]) {
fn flush(&mut self) { fail!() }
}
pub struct TcpListener {
rtlistener: ~RtioTcpListenerObject,
}
pub struct TcpListener(~RtioTcpListenerObject);
impl TcpListener {
pub fn bind(addr: IpAddr) -> Option<TcpListener> {
......@@ -93,11 +83,7 @@ pub fn bind(addr: IpAddr) -> Option<TcpListener> {
(*io).tcp_bind(addr)
};
match listener {
Ok(l) => {
Some(TcpListener {
rtlistener: l
})
}
Ok(l) => Some(TcpListener(l)),
Err(ioerr) => {
io_error::cond.raise(ioerr);
return None;
......@@ -108,8 +94,7 @@ pub fn bind(addr: IpAddr) -> Option<TcpListener> {
impl Listener<TcpStream> for TcpListener {
fn accept(&mut self) -> Option<TcpStream> {
let rtstream = self.rtlistener.accept();
match rtstream {
match (**self).accept() {
Ok(s) => {
Some(TcpStream::new(s))
}
......@@ -163,7 +148,7 @@ fn connect_error() {
}
#[test]
fn smoke_test() {
fn smoke_test_ip4() {
do run_in_newsched_task {
let addr = next_test_ip4();
......@@ -183,7 +168,27 @@ fn smoke_test() {
}
#[test]
fn read_eof() {
fn smoke_test_ip6() {
do run_in_newsched_task {
let addr = next_test_ip6();
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut buf = [0];
stream.read(buf);
assert!(buf[0] == 99);
}
do spawntask_immediately {
let mut stream = TcpStream::connect(addr);
stream.write([99]);
}
}
}
#[test]
fn read_eof_ip4() {
do run_in_newsched_task {
let addr = next_test_ip4();
......@@ -203,7 +208,27 @@ fn read_eof() {
}
#[test]
fn read_eof_twice() {
fn read_eof_ip6() {
do run_in_newsched_task {
let addr = next_test_ip6();
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut buf = [0];
let nread = stream.read(buf);
assert!(nread.is_none());
}
do spawntask_immediately {
let _stream = TcpStream::connect(addr);
// Close
}
}
}
#[test]
fn read_eof_twice_ip4() {
do run_in_newsched_task {
let addr = next_test_ip4();
......@@ -225,7 +250,29 @@ fn read_eof_twice() {
}
#[test]
fn write_close() {
fn read_eof_twice_ip6() {
do run_in_newsched_task {
let addr = next_test_ip6();
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut buf = [0];
let nread = stream.read(buf);
assert!(nread.is_none());
let nread = stream.read(buf);
assert!(nread.is_none());
}
do spawntask_immediately {
let _stream = TcpStream::connect(addr);
// Close
}
}
}
#[test]
fn write_close_ip4() {
do run_in_newsched_task {
let addr = next_test_ip4();
......@@ -254,7 +301,36 @@ fn write_close() {
}
#[test]
fn multiple_connect_serial() {
fn write_close_ip6() {
do run_in_newsched_task {
let addr = next_test_ip6();
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let buf = [0];
loop {
let mut stop = false;
do io_error::cond.trap(|e| {
// NB: ECONNRESET on linux, EPIPE on mac
assert!(e.kind == ConnectionReset || e.kind == BrokenPipe);
stop = true;
}).in {
stream.write(buf);
}
if stop { break }
}
}
do spawntask_immediately {
let _stream = TcpStream::connect(addr);
// Close
}
}
}
#[test]
fn multiple_connect_serial_ip4() {
do run_in_newsched_task {
let addr = next_test_ip4();
let max = 10;
......@@ -279,7 +355,32 @@ fn multiple_connect_serial() {
}
#[test]
fn multiple_connect_interleaved_greedy_schedule() {
fn multiple_connect_serial_ip6() {
do run_in_newsched_task {
let addr = next_test_ip6();
let max = 10;
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
for max.times {
let mut stream = listener.accept();
let mut buf = [0];
stream.read(buf);
assert_eq!(buf[0], 99);
}
}
do spawntask_immediately {
for max.times {
let mut stream = TcpStream::connect(addr);
stream.write([99]);
}
}
}
}
#[test]
fn multiple_connect_interleaved_greedy_schedule_ip4() {
do run_in_newsched_task {
let addr = next_test_ip4();
static MAX: int = 10;
......@@ -318,7 +419,46 @@ fn connect(i: int, addr: IpAddr) {
}
#[test]
fn multiple_connect_interleaved_lazy_schedule() {
fn multiple_connect_interleaved_greedy_schedule_ip6() {
do run_in_newsched_task {
let addr = next_test_ip6();
static MAX: int = 10;
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
for int::range(0, MAX) |i| {
let stream = Cell::new(listener.accept());
rtdebug!("accepted");
// Start another task to handle the connection
do spawntask_immediately {
let mut stream = stream.take();
let mut buf = [0];
stream.read(buf);
assert!(buf[0] == i as u8);
rtdebug!("read");
}
}
}
connect(0, addr);
fn connect(i: int, addr: IpAddr) {
if i == MAX { return }
do spawntask_immediately {
rtdebug!("connecting");
let mut stream = TcpStream::connect(addr);
// Connect again before writing
connect(i + 1, addr);
rtdebug!("writing");
stream.write([i as u8]);
}
}
}
}
#[test]
fn multiple_connect_interleaved_lazy_schedule_ip4() {
do run_in_newsched_task {
let addr = next_test_ip4();
static MAX: int = 10;
......@@ -355,5 +495,43 @@ fn connect(i: int, addr: IpAddr) {
}
}
}
#[test]
fn multiple_connect_interleaved_lazy_schedule_ip6() {
do run_in_newsched_task {
let addr = next_test_ip6();
static MAX: int = 10;
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
for int::range(0, MAX) |_| {
let stream = Cell::new(listener.accept());
rtdebug!("accepted");
// Start another task to handle the connection
do spawntask_later {
let mut stream = stream.take();
let mut buf = [0];
stream.read(buf);
assert!(buf[0] == 99);
rtdebug!("read");
}
}
}
connect(0, addr);
fn connect(i: int, addr: IpAddr) {
if i == MAX { return }
do spawntask_later {
rtdebug!("connecting");
let mut stream = TcpStream::connect(addr);
// Connect again before writing
connect(i + 1, addr);
rtdebug!("writing");
stream.write([99]);
}
}
}
}
}
......@@ -8,38 +8,247 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use prelude::*;
use super::super::*;
use super::ip::IpAddr;
use option::{Option, Some, None};
use result::{Ok, Err};
use rt::io::net::ip::IpAddr;
use rt::io::{Reader, Writer};
use rt::io::{io_error, read_error, EndOfFile};
use rt::rtio::{RtioUdpSocketObject, RtioUdpSocket, IoFactory, IoFactoryObject};
use rt::local::Local;
pub struct UdpStream;
pub struct UdpSocket(~RtioUdpSocketObject);
impl UdpStream {
pub fn connect(_addr: IpAddr) -> Option<UdpStream> {
fail!()
impl UdpSocket {
pub fn bind(addr: IpAddr) -> Option<UdpSocket> {
let socket = unsafe { (*Local::unsafe_borrow::<IoFactoryObject>()).udp_bind(addr) };
match socket {
Ok(s) => Some(UdpSocket(s)),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
}
}
pub fn recvfrom(&self, buf: &mut [u8]) -> Option<(uint, IpAddr)> {
match (**self).recvfrom(buf) {
Ok((nread, src)) => Some((nread, src)),
Err(ioerr) => {
// EOF is indicated by returning None
if ioerr.kind != EndOfFile {
read_error::cond.raise(ioerr);
}
None
}
}
}
pub fn sendto(&self, buf: &[u8], dst: IpAddr) {
match (**self).sendto(buf, dst) {
Ok(_) => (),
Err(ioerr) => io_error::cond.raise(ioerr),
}
}
pub fn connect(self, other: IpAddr) -> UdpStream {
UdpStream { socket: self, connectedTo: other }
}
}
pub struct UdpStream {
socket: UdpSocket,
connectedTo: IpAddr
}
impl UdpStream {
pub fn as_socket<T>(&self, f: &fn(&UdpSocket) -> T) -> T { f(&self.socket) }
pub fn disconnect(self) -> UdpSocket { self.socket }
}
impl Reader for UdpStream {
fn read(&mut self, _buf: &mut [u8]) -> Option<uint> { fail!() }
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
do self.as_socket |sock| {
match sock.recvfrom(buf) {
Some((_nread, src)) if src != self.connectedTo => Some(0),
Some((nread, _src)) => Some(nread),
None => None,
}
}
}
fn eof(&mut self) -> bool { fail!() }
}
impl Writer for UdpStream {
fn write(&mut self, _buf: &[u8]) { fail!() }
fn write(&mut self, buf: &[u8]) {
do self.as_socket |sock| {
sock.sendto(buf, self.connectedTo);
}
}
fn flush(&mut self) { fail!() }
}
pub struct UdpListener;
#[cfg(test)]
mod test {
use super::*;
use rt::test::*;
use rt::io::net::ip::Ipv4;
use rt::io::*;
use option::{Some, None};
impl UdpListener {
pub fn bind(_addr: IpAddr) -> Option<UdpListener> {
fail!()
#[test] #[ignore]
fn bind_error() {
do run_in_newsched_task {
let mut called = false;
do io_error::cond.trap(|e| {
assert!(e.kind == PermissionDenied);
called = true;
}).in {
let addr = Ipv4(0, 0, 0, 0, 1);
let socket = UdpSocket::bind(addr);
assert!(socket.is_none());
}
assert!(called);
}
}
}
impl Listener<UdpStream> for UdpListener {
fn accept(&mut self) -> Option<UdpStream> { fail!() }
#[test]
fn socket_smoke_test_ip4() {
do run_in_newsched_task {
let server_ip = next_test_ip4();
let client_ip = next_test_ip4();
do spawntask_immediately {
match UdpSocket::bind(server_ip) {
Some(server) => {
let mut buf = [0];
match server.recvfrom(buf) {
Some((nread, src)) => {
assert_eq!(nread, 1);
assert_eq!(buf[0], 99);
assert_eq!(src, client_ip);
}
None => fail!()
}
}
None => fail!()
}
}
do spawntask_immediately {
match UdpSocket::bind(client_ip) {
Some(client) => client.sendto([99], server_ip),
None => fail!()
}
}
}
}
#[test]
fn socket_smoke_test_ip6() {
do run_in_newsched_task {
let server_ip = next_test_ip6();
let client_ip = next_test_ip6();
do spawntask_immediately {
match UdpSocket::bind(server_ip) {
Some(server) => {
let mut buf = [0];
match server.recvfrom(buf) {
Some((nread, src)) => {
assert_eq!(nread, 1);
assert_eq!(buf[0], 99);
assert_eq!(src, client_ip);
}
None => fail!()
}
}
None => fail!()
}
}
do spawntask_immediately {
match UdpSocket::bind(client_ip) {
Some(client) => client.sendto([99], server_ip),
None => fail!()
}
}
}
}
#[test]
fn stream_smoke_test_ip4() {
do run_in_newsched_task {
let server_ip = next_test_ip4();
let client_ip = next_test_ip4();
do spawntask_immediately {
match UdpSocket::bind(server_ip) {
Some(server) => {
let server = ~server;
let mut stream = server.connect(client_ip);
let mut buf = [0];
match stream.read(buf) {
Some(nread) => {
assert_eq!(nread, 1);
assert_eq!(buf[0], 99);
}
None => fail!()
}
}
None => fail!()
}
}
do spawntask_immediately {
match UdpSocket::bind(client_ip) {
Some(client) => {
let client = ~client;
let mut stream = client.connect(server_ip);
stream.write([99]);
}
None => fail!()
}
}
}
}
#[test]
fn stream_smoke_test_ip6() {
do run_in_newsched_task {
let server_ip = next_test_ip6();
let client_ip = next_test_ip6();
do spawntask_immediately {
match UdpSocket::bind(server_ip) {
Some(server) => {
let server = ~server;
let mut stream = server.connect(client_ip);
let mut buf = [0];
match stream.read(buf) {
Some(nread) => {
assert_eq!(nread, 1);
assert_eq!(buf[0], 99);
}
None => fail!()
}
}
None => fail!()
}
}
do spawntask_immediately {
match UdpSocket::bind(client_ip) {
Some(client) => {
let client = ~client;
let mut stream = client.connect(server_ip);
stream.write([99]);
}
None => fail!()
}
}
}
}
}
此差异已折叠。
......@@ -13,12 +13,13 @@
use rt::task::Task;
use rt::local_ptr;
use rt::rtio::{EventLoop, IoFactoryObject};
//use borrow::to_uint;
pub trait Local {
fn put(value: ~Self);
fn take() -> ~Self;
fn exists() -> bool;
fn borrow(f: &fn(&mut Self));
fn borrow<T>(f: &fn(&mut Self) -> T) -> T;
unsafe fn unsafe_borrow() -> *mut Self;
unsafe fn try_unsafe_borrow() -> Option<*mut Self>;
}
......@@ -27,23 +28,40 @@ impl Local for Scheduler {
fn put(value: ~Scheduler) { unsafe { local_ptr::put(value) }}
fn take() -> ~Scheduler { unsafe { local_ptr::take() } }
fn exists() -> bool { local_ptr::exists() }
fn borrow(f: &fn(&mut Scheduler)) { unsafe { local_ptr::borrow(f) } }
fn borrow<T>(f: &fn(&mut Scheduler) -> T) -> T {
let mut res: Option<T> = None;
let res_ptr: *mut Option<T> = &mut res;
unsafe {
do local_ptr::borrow |sched| {
// rtdebug!("successfully unsafe borrowed sched pointer");
let result = f(sched);
*res_ptr = Some(result);
}
}
match res {
Some(r) => { r }
None => rtabort!("function failed!")
}
}
unsafe fn unsafe_borrow() -> *mut Scheduler { local_ptr::unsafe_borrow() }
unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { abort!("unimpl") }
unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { rtabort!("unimpl") }
}
impl Local for Task {
fn put(_value: ~Task) { abort!("unimpl") }
fn take() -> ~Task { abort!("unimpl") }
fn exists() -> bool { abort!("unimpl") }
fn borrow(f: &fn(&mut Task)) {
do Local::borrow::<Scheduler> |sched| {
fn put(_value: ~Task) { rtabort!("unimpl") }
fn take() -> ~Task { rtabort!("unimpl") }
fn exists() -> bool { rtabort!("unimpl") }
fn borrow<T>(f: &fn(&mut Task) -> T) -> T {
do Local::borrow::<Scheduler, T> |sched| {
// rtdebug!("sched about to grab current_task");
match sched.current_task {
Some(~ref mut task) => {
f(&mut *task.task)
// rtdebug!("current task pointer: %x", to_uint(task));
// rtdebug!("current task heap pointer: %x", to_uint(&task.heap));
f(task)
}
None => {
abort!("no scheduler")
rtabort!("no scheduler")
}
}
}
......@@ -51,12 +69,12 @@ fn borrow(f: &fn(&mut Task)) {
unsafe fn unsafe_borrow() -> *mut Task {
match (*Local::unsafe_borrow::<Scheduler>()).current_task {
Some(~ref mut task) => {
let s: *mut Task = &mut *task.task;
let s: *mut Task = &mut *task;
return s;
}
None => {
// Don't fail. Infinite recursion
abort!("no scheduler")
rtabort!("no scheduler")
}
}
}
......@@ -71,48 +89,69 @@ unsafe fn try_unsafe_borrow() -> Option<*mut Task> {
// XXX: This formulation won't work once ~IoFactoryObject is a real trait pointer
impl Local for IoFactoryObject {
fn put(_value: ~IoFactoryObject) { abort!("unimpl") }
fn take() -> ~IoFactoryObject { abort!("unimpl") }
fn exists() -> bool { abort!("unimpl") }
fn borrow(_f: &fn(&mut IoFactoryObject)) { abort!("unimpl") }
fn put(_value: ~IoFactoryObject) { rtabort!("unimpl") }
fn take() -> ~IoFactoryObject { rtabort!("unimpl") }
fn exists() -> bool { rtabort!("unimpl") }
fn borrow<T>(_f: &fn(&mut IoFactoryObject) -> T) -> T { rtabort!("unimpl") }
unsafe fn unsafe_borrow() -> *mut IoFactoryObject {
let sched = Local::unsafe_borrow::<Scheduler>();
let io: *mut IoFactoryObject = (*sched).event_loop.io().unwrap();
return io;
}
unsafe fn try_unsafe_borrow() -> Option<*mut IoFactoryObject> { abort!("unimpl") }
unsafe fn try_unsafe_borrow() -> Option<*mut IoFactoryObject> { rtabort!("unimpl") }
}
#[cfg(test)]
mod test {
use unstable::run_in_bare_thread;
use rt::test::*;
use rt::sched::Scheduler;
use rt::uv::uvio::UvEventLoop;
use super::*;
#[test]
fn thread_local_scheduler_smoke_test() {
let scheduler = ~UvEventLoop::new_scheduler();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
do run_in_bare_thread {
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
}
}
#[test]
fn thread_local_scheduler_two_instances() {
let scheduler = ~UvEventLoop::new_scheduler();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
let scheduler = ~UvEventLoop::new_scheduler();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
do run_in_bare_thread {
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
}
}
#[test]
fn borrow_smoke_test() {
let scheduler = ~UvEventLoop::new_scheduler();
Local::put(scheduler);
unsafe {
let _scheduler: *mut Scheduler = Local::unsafe_borrow();
do run_in_bare_thread {
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
unsafe {
let _scheduler: *mut Scheduler = Local::unsafe_borrow();
}
let _scheduler: ~Scheduler = Local::take();
}
let _scheduler: ~Scheduler = Local::take();
}
#[test]
fn borrow_with_return() {
do run_in_bare_thread {
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
let res = do Local::borrow::<Scheduler,bool> |_sched| {
true
};
assert!(res);
let _scheduler: ~Scheduler = Local::take();
}
}
}
......@@ -10,11 +10,24 @@
//! The local, garbage collected heap
use libc;
use libc::{c_void, uintptr_t, size_t};
use ops::Drop;
use repr::BoxRepr;
use rt;
use rt::OldTaskContext;
use rt::local::Local;
use rt::task::Task;
type MemoryRegion = c_void;
type BoxedRegion = c_void;
struct Env { priv opaque: () }
struct BoxedRegion {
env: *Env,
backing_region: *MemoryRegion,
live_allocs: *BoxRepr
}
pub type OpaqueBox = c_void;
pub type TypeDesc = c_void;
......@@ -49,6 +62,12 @@ pub fn alloc(&mut self, td: *TypeDesc, size: uint) -> *OpaqueBox {
}
}
pub fn realloc(&mut self, ptr: *OpaqueBox, size: uint) -> *OpaqueBox {
unsafe {
return rust_boxed_region_realloc(self.boxed_region, ptr, size as size_t);
}
}
pub fn free(&mut self, box: *OpaqueBox) {
unsafe {
return rust_boxed_region_free(self.boxed_region, box);
......@@ -65,6 +84,40 @@ fn drop(&self) {
}
}
// A little compatibility function
pub unsafe fn local_free(ptr: *libc::c_char) {
match rt::context() {
OldTaskContext => {
rust_upcall_free_noswitch(ptr);
extern {
#[fast_ffi]
unsafe fn rust_upcall_free_noswitch(ptr: *libc::c_char);
}
}
_ => {
do Local::borrow::<Task,()> |task| {
task.heap.free(ptr as *libc::c_void);
}
}
}
}
pub fn live_allocs() -> *BoxRepr {
let region = match rt::context() {
OldTaskContext => {
unsafe { rust_current_boxed_region() }
}
_ => {
do Local::borrow::<Task, *BoxedRegion> |task| {
task.heap.boxed_region
}
}
};
return unsafe { (*region).live_allocs };
}
extern {
fn rust_new_memory_region(synchronized: uintptr_t,
detailed_leaks: uintptr_t,
......@@ -76,5 +129,9 @@ fn rust_new_boxed_region(region: *MemoryRegion,
fn rust_boxed_region_malloc(region: *BoxedRegion,
td: *TypeDesc,
size: size_t) -> *OpaqueBox;
fn rust_boxed_region_realloc(region: *BoxedRegion,
ptr: *OpaqueBox,
size: size_t) -> *OpaqueBox;
fn rust_boxed_region_free(region: *BoxedRegion, box: *OpaqueBox);
fn rust_current_boxed_region() -> *BoxedRegion;
}
......@@ -109,7 +109,7 @@ pub unsafe fn unsafe_borrow<T>() -> *mut T {
fn tls_key() -> tls::Key {
match maybe_tls_key() {
Some(key) => key,
None => abort!("runtime tls key not initialized")
None => rtabort!("runtime tls key not initialized")
}
}
......
......@@ -9,6 +9,7 @@
// except according to those terms.
use either::*;
use libc;
pub trait Logger {
fn log(&mut self, msg: Either<~str, &'static str>);
......@@ -20,6 +21,10 @@ impl Logger for StdErrLogger {
fn log(&mut self, msg: Either<~str, &'static str>) {
use io::{Writer, WriterUtil};
if !should_log_console() {
return;
}
let s: &str = match msg {
Left(ref s) => {
let s: &str = *s;
......@@ -44,7 +49,6 @@ pub fn init(crate_map: *u8) {
use str;
use ptr;
use option::{Some, None};
use libc::c_char;
let log_spec = os::getenv("RUST_LOG");
match log_spec {
......@@ -61,8 +65,16 @@ pub fn init(crate_map: *u8) {
}
}
}
}
extern {
fn rust_update_log_settings(crate_map: *u8, settings: *c_char);
}
pub fn console_on() { unsafe { rust_log_console_on() } }
pub fn console_off() { unsafe { rust_log_console_off() } }
fn should_log_console() -> bool { unsafe { rust_should_log_console() != 0 } }
extern {
fn rust_update_log_settings(crate_map: *u8, settings: *libc::c_char);
fn rust_log_console_on();
fn rust_log_console_off();
fn rust_should_log_console() -> libc::uintptr_t;
}
......@@ -8,6 +8,9 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! A concurrent queue that supports multiple producers and a
//! single consumer.
use container::Container;
use kinds::Send;
use vec::OwnedVector;
......
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use to_str::ToStr;
pub struct SchedMetrics {
// The number of times executing `run_sched_once`.
turns: uint,
// The number of turns that received a message.
messages_received: uint,
// The number of turns that ran a task from the queue.
tasks_resumed_from_queue: uint,
// The number of turns that found no work to perform.
wasted_turns: uint,
// The number of times the scheduler went to sleep.
sleepy_times: uint,
// Context switches from the scheduler into a task.
context_switches_sched_to_task: uint,
// Context switches from a task into the scheduler.
context_switches_task_to_sched: uint,
// Context switches from a task to a task.
context_switches_task_to_task: uint,
// Message sends that unblock the receiver
rendezvous_sends: uint,
// Message sends that do not unblock the receiver
non_rendezvous_sends: uint,
// Message receives that do not block the receiver
rendezvous_recvs: uint,
// Message receives that block the receiver
non_rendezvous_recvs: uint,
// JoinLatch releases that create tombstones
release_tombstone: uint,
// JoinLatch releases that do not create tombstones
release_no_tombstone: uint,
}
impl SchedMetrics {
pub fn new() -> SchedMetrics {
SchedMetrics {
turns: 0,
messages_received: 0,
tasks_resumed_from_queue: 0,
wasted_turns: 0,
sleepy_times: 0,
context_switches_sched_to_task: 0,
context_switches_task_to_sched: 0,
context_switches_task_to_task: 0,
rendezvous_sends: 0,
non_rendezvous_sends: 0,
rendezvous_recvs: 0,
non_rendezvous_recvs: 0,
release_tombstone: 0,
release_no_tombstone: 0
}
}
}
impl ToStr for SchedMetrics {
fn to_str(&self) -> ~str {
fmt!("turns: %u\n\
messages_received: %u\n\
tasks_resumed_from_queue: %u\n\
wasted_turns: %u\n\
sleepy_times: %u\n\
context_switches_sched_to_task: %u\n\
context_switches_task_to_sched: %u\n\
context_switches_task_to_task: %u\n\
rendezvous_sends: %u\n\
non_rendezvous_sends: %u\n\
rendezvous_recvs: %u\n\
non_rendezvous_recvs: %u\n\
release_tombstone: %u\n\
release_no_tombstone: %u\n\
",
self.turns,
self.messages_received,
self.tasks_resumed_from_queue,
self.wasted_turns,
self.sleepy_times,
self.context_switches_sched_to_task,
self.context_switches_task_to_sched,
self.context_switches_task_to_task,
self.rendezvous_sends,
self.non_rendezvous_sends,
self.rendezvous_recvs,
self.non_rendezvous_recvs,
self.release_tombstone,
self.release_no_tombstone
)
}
}
\ No newline at end of file
......@@ -55,8 +55,27 @@
*/
#[doc(hidden)];
#[deny(unused_imports)];
#[deny(unused_mut)];
#[deny(unused_variable)];
#[deny(unused_unsafe)];
use cell::Cell;
use clone::Clone;
use container::Container;
use iter::Times;
use iterator::IteratorUtil;
use option::Some;
use ptr::RawPtr;
use rt::sched::{Scheduler, Shutdown};
use rt::sleeper_list::SleeperList;
use rt::task::Task;
use rt::thread::Thread;
use rt::work_queue::WorkQueue;
use rt::uv::uvio::UvEventLoop;
use unstable::atomics::{AtomicInt, SeqCst};
use unstable::sync::UnsafeAtomicRcBox;
use vec::{OwnedVector, MutableVector};
/// The global (exchange) heap.
pub mod global_heap;
......@@ -88,6 +107,9 @@
/// A parallel queue.
mod message_queue;
/// A parallel data structure for tracking sleeping schedulers.
mod sleeper_list;
/// Stack segments and caching.
mod stack;
......@@ -101,7 +123,7 @@
pub mod env;
/// The local, managed heap
mod local_heap;
pub mod local_heap;
/// The Logger trait and implementations
pub mod logging;
......@@ -127,6 +149,20 @@
/// Bindings to pthread/windows thread-local storage.
pub mod thread_local_storage;
/// For waiting on child tasks.
pub mod join_latch;
pub mod metrics;
// FIXME #5248 shouldn't be pub
/// Just stuff
pub mod util;
// Global command line argument storage
pub mod args;
// Support for dynamic borrowck
pub mod borrowck;
/// Set up a default runtime configuration, given compiler-supplied arguments.
///
......@@ -142,27 +178,128 @@
/// # Return value
///
/// The return value is used as the process return code. 0 on success, 101 on error.
pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int {
pub fn start(argc: int, argv: **u8, crate_map: *u8, main: ~fn()) -> int {
use self::sched::{Scheduler, Coroutine};
use self::uv::uvio::UvEventLoop;
init(argc, argv, crate_map);
let exit_code = run(main);
cleanup();
init(crate_map);
return exit_code;
}
let loop_ = ~UvEventLoop::new();
let mut sched = ~Scheduler::new(loop_);
let main_task = ~Coroutine::new(&mut sched.stack_pool, main);
/// One-time runtime initialization.
///
/// Initializes global state, including frobbing
/// the crate's logging flags, registering GC
/// metadata, and storing the process arguments.
pub fn init(argc: int, argv: **u8, crate_map: *u8) {
// XXX: Derefing these pointers is not safe.
// Need to propagate the unsafety to `start`.
unsafe {
args::init(argc, argv);
logging::init(crate_map);
rust_update_gc_metadata(crate_map);
}
sched.enqueue_task(main_task);
sched.run();
extern {
fn rust_update_gc_metadata(crate_map: *u8);
}
}
return 0;
/// One-time runtime cleanup.
pub fn cleanup() {
args::cleanup();
}
/// One-time runtime initialization. Currently all this does is set up logging
/// based on the RUST_LOG environment variable.
pub fn init(crate_map: *u8) {
logging::init(crate_map);
/// Execute the main function in a scheduler.
///
/// Configures the runtime according to the environment, by default
/// using a task scheduler with the same number of threads as cores.
/// Returns a process exit code.
pub fn run(main: ~fn()) -> int {
static DEFAULT_ERROR_CODE: int = 101;
let nthreads = util::default_sched_threads();
// The shared list of sleeping schedulers. Schedulers wake each other
// occassionally to do new work.
let sleepers = SleeperList::new();
// The shared work queue. Temporary until work stealing is implemented.
let work_queue = WorkQueue::new();
// The schedulers.
let mut scheds = ~[];
// Handles to the schedulers. When the main task ends these will be
// sent the Shutdown message to terminate the schedulers.
let mut handles = ~[];
for nthreads.times {
// Every scheduler is driven by an I/O event loop.
let loop_ = ~UvEventLoop::new();
let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone());
let handle = sched.make_handle();
scheds.push(sched);
handles.push(handle);
}
// Create a shared cell for transmitting the process exit
// code from the main task to this function.
let exit_code = UnsafeAtomicRcBox::new(AtomicInt::new(0));
let exit_code_clone = exit_code.clone();
// When the main task exits, after all the tasks in the main
// task tree, shut down the schedulers and set the exit code.
let handles = Cell::new(handles);
let on_exit: ~fn(bool) = |exit_success| {
let mut handles = handles.take();
for handles.mut_iter().advance |handle| {
handle.send(Shutdown);
}
unsafe {
let exit_code = if exit_success {
use rt::util;
// If we're exiting successfully, then return the global
// exit status, which can be set programmatically.
util::get_exit_status()
} else {
DEFAULT_ERROR_CODE
};
(*exit_code_clone.get()).store(exit_code, SeqCst);
}
};
// Create and enqueue the main task.
let main_cell = Cell::new(main);
let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool,
main_cell.take());
main_task.on_exit = Some(on_exit);
scheds[0].enqueue_task(main_task);
// Run each scheduler in a thread.
let mut threads = ~[];
while !scheds.is_empty() {
let sched = scheds.pop();
let sched_cell = Cell::new(sched);
let thread = do Thread::start {
let sched = sched_cell.take();
sched.run();
};
threads.push(thread);
}
// Wait for schedulers
{ let _threads = threads; }
// Return the exit code
unsafe {
(*exit_code.get()).load(SeqCst)
}
}
/// Possible contexts in which Rust code may be executing.
......@@ -194,8 +331,8 @@ pub fn context() -> RuntimeContext {
return OldTaskContext;
} else {
if Local::exists::<Scheduler>() {
let context = ::cell::Cell::new_empty();
do Local::borrow::<Scheduler> |sched| {
let context = Cell::new_empty();
do Local::borrow::<Scheduler, ()> |sched| {
if sched.in_task_context() {
context.put_back(TaskContext);
} else {
......@@ -217,24 +354,20 @@ pub fn context() -> RuntimeContext {
#[test]
fn test_context() {
use unstable::run_in_bare_thread;
use self::sched::{Scheduler, Coroutine};
use rt::uv::uvio::UvEventLoop;
use cell::Cell;
use self::sched::{Scheduler};
use rt::local::Local;
use rt::test::new_test_uv_sched;
assert_eq!(context(), OldTaskContext);
do run_in_bare_thread {
assert_eq!(context(), GlobalContext);
let mut sched = ~UvEventLoop::new_scheduler();
let task = ~do Coroutine::new(&mut sched.stack_pool) {
let mut sched = ~new_test_uv_sched();
let task = ~do Task::new_root(&mut sched.stack_pool) {
assert_eq!(context(), TaskContext);
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then() |task| {
do sched.deschedule_running_task_and_then() |sched, task| {
assert_eq!(context(), SchedulerContext);
let task = Cell::new(task);
do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(task.take());
}
sched.enqueue_task(task);
}
};
sched.enqueue_task(task);
......
......@@ -18,28 +18,69 @@
// XXX: ~object doesn't work currently so these are some placeholder
// types to use instead
pub type EventLoopObject = uvio::UvEventLoop;
pub type RemoteCallbackObject = uvio::UvRemoteCallback;
pub type IoFactoryObject = uvio::UvIoFactory;
pub type RtioTcpStreamObject = uvio::UvTcpStream;
pub type RtioTcpListenerObject = uvio::UvTcpListener;
pub type RtioUdpSocketObject = uvio::UvUdpSocket;
pub trait EventLoop {
fn run(&mut self);
fn callback(&mut self, ~fn());
fn callback_ms(&mut self, ms: u64, ~fn());
fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject;
/// The asynchronous I/O services. Not all event loops may provide one
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject>;
}
pub trait RemoteCallback {
/// Trigger the remote callback. Note that the number of times the callback
/// is run is not guaranteed. All that is guaranteed is that, after calling 'fire',
/// the callback will be called at least once, but multiple callbacks may be coalesced
/// and callbacks may be called more often requested. Destruction also triggers the
/// callback.
fn fire(&mut self);
}
pub trait IoFactory {
fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>;
fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>;
fn udp_bind(&mut self, addr: IpAddr) -> Result<~RtioUdpSocketObject, IoError>;
}
pub trait RtioTcpListener {
pub trait RtioTcpListener : RtioSocket {
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>;
fn accept_simultaneously(&self);
fn dont_accept_simultaneously(&self);
}
pub trait RtioTcpStream : RtioSocket {
fn read(&self, buf: &mut [u8]) -> Result<uint, IoError>;
fn write(&self, buf: &[u8]) -> Result<(), IoError>;
fn peer_name(&self) -> IpAddr;
fn control_congestion(&self);
fn nodelay(&self);
fn keepalive(&self, delay_in_seconds: uint);
fn letdie(&self);
}
pub trait RtioSocket {
fn socket_name(&self) -> IpAddr;
}
pub trait RtioTcpStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>;
fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
pub trait RtioUdpSocket : RtioSocket {
fn recvfrom(&self, buf: &mut [u8]) -> Result<(uint, IpAddr), IoError>;
fn sendto(&self, buf: &[u8], dst: IpAddr) -> Result<(), IoError>;
fn join_multicast(&self, multi: IpAddr);
fn leave_multicast(&self, multi: IpAddr);
fn loop_multicast_locally(&self);
fn dont_loop_multicast_locally(&self);
fn multicast_time_to_live(&self, ttl: int);
fn time_to_live(&self, ttl: int);
fn hear_broadcasts(&self);
fn ignore_broadcasts(&self);
}
此差异已折叠。
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Maintains a shared list of sleeping schedulers. Schedulers
//! use this to wake each other up.
use container::Container;
use vec::OwnedVector;
use option::{Option, Some, None};
use cell::Cell;
use unstable::sync::{Exclusive, exclusive};
use rt::sched::SchedHandle;
use clone::Clone;
pub struct SleeperList {
priv stack: ~Exclusive<~[SchedHandle]>
}
impl SleeperList {
pub fn new() -> SleeperList {
SleeperList {
stack: ~exclusive(~[])
}
}
pub fn push(&mut self, handle: SchedHandle) {
let handle = Cell::new(handle);
unsafe {
self.stack.with(|s| s.push(handle.take()));
}
}
pub fn pop(&mut self) -> Option<SchedHandle> {
unsafe {
do self.stack.with |s| {
if !s.is_empty() {
Some(s.pop())
} else {
None
}
}
}
}
}
impl Clone for SleeperList {
fn clone(&self) -> SleeperList {
SleeperList {
stack: self.stack.clone()
}
}
}
\ No newline at end of file
......@@ -15,20 +15,45 @@
use borrow;
use cast::transmute;
use cleanup;
use libc::{c_void, uintptr_t};
use ptr;
use prelude::*;
use option::{Option, Some, None};
use rt::local::Local;
use rt::logging::StdErrLogger;
use super::local_heap::LocalHeap;
use rt::sched::{Scheduler, SchedHandle};
use rt::join_latch::JoinLatch;
use rt::stack::{StackSegment, StackPool};
use rt::context::Context;
use cell::Cell;
pub struct Task {
heap: LocalHeap,
gc: GarbageCollector,
storage: LocalStorage,
logger: StdErrLogger,
unwinder: Option<Unwinder>,
destroyed: bool
unwinder: Unwinder,
home: Option<SchedHome>,
join_latch: Option<~JoinLatch>,
on_exit: Option<~fn(bool)>,
destroyed: bool,
coroutine: Option<~Coroutine>
}
pub struct Coroutine {
/// The segment of stack on which the task is currently running or
/// if the task is blocked, on which the task will resume
/// execution.
priv current_stack_segment: StackSegment,
/// Always valid if the task is alive and not running.
saved_context: Context
}
pub enum SchedHome {
AnySched,
Sched(SchedHandle)
}
pub struct GarbageCollector;
......@@ -39,73 +64,227 @@ pub struct Unwinder {
}
impl Task {
pub fn new() -> Task {
pub fn new_root(stack_pool: &mut StackPool,
start: ~fn()) -> Task {
Task::new_root_homed(stack_pool, AnySched, start)
}
pub fn new_child(&mut self,
stack_pool: &mut StackPool,
start: ~fn()) -> Task {
self.new_child_homed(stack_pool, AnySched, start)
}
pub fn new_root_homed(stack_pool: &mut StackPool,
home: SchedHome,
start: ~fn()) -> Task {
Task {
heap: LocalHeap::new(),
gc: GarbageCollector,
storage: LocalStorage(ptr::null(), None),
logger: StdErrLogger,
unwinder: Some(Unwinder { unwinding: false }),
destroyed: false
unwinder: Unwinder { unwinding: false },
home: Some(home),
join_latch: Some(JoinLatch::new_root()),
on_exit: None,
destroyed: false,
coroutine: Some(~Coroutine::new(stack_pool, start))
}
}
pub fn without_unwinding() -> Task {
pub fn new_child_homed(&mut self,
stack_pool: &mut StackPool,
home: SchedHome,
start: ~fn()) -> Task {
Task {
heap: LocalHeap::new(),
gc: GarbageCollector,
storage: LocalStorage(ptr::null(), None),
logger: StdErrLogger,
unwinder: None,
destroyed: false
home: Some(home),
unwinder: Unwinder { unwinding: false },
join_latch: Some(self.join_latch.get_mut_ref().new_child()),
on_exit: None,
destroyed: false,
coroutine: Some(~Coroutine::new(stack_pool, start))
}
}
pub fn give_home(&mut self, new_home: SchedHome) {
self.home = Some(new_home);
}
pub fn run(&mut self, f: &fn()) {
// This is just an assertion that `run` was called unsafely
// and this instance of Task is still accessible.
do Local::borrow::<Task> |task| {
do Local::borrow::<Task, ()> |task| {
assert!(borrow::ref_eq(task, self));
}
match self.unwinder {
Some(ref mut unwinder) => {
// If there's an unwinder then set up the catch block
unwinder.try(f);
self.unwinder.try(f);
self.destroy();
// Wait for children. Possibly report the exit status.
let local_success = !self.unwinder.unwinding;
let join_latch = self.join_latch.swap_unwrap();
match self.on_exit {
Some(ref on_exit) => {
let success = join_latch.wait(local_success);
(*on_exit)(success);
}
None => {
// Otherwise, just run the body
f()
join_latch.release(local_success);
}
}
self.destroy();
}
/// Must be called manually before finalization to clean up
/// must be called manually before finalization to clean up
/// thread-local resources. Some of the routines here expect
/// Task to be available recursively so this must be
/// called unsafely, without removing Task from
/// thread-local-storage.
fn destroy(&mut self) {
// This is just an assertion that `destroy` was called unsafely
// and this instance of Task is still accessible.
do Local::borrow::<Task> |task| {
do Local::borrow::<Task, ()> |task| {
assert!(borrow::ref_eq(task, self));
}
match self.storage {
LocalStorage(ptr, Some(ref dtor)) => {
(*dtor)(ptr)
}
_ => ()
}
// Destroy remaining boxes
unsafe { cleanup::annihilate(); }
self.destroyed = true;
}
/// Check if *task* is currently home.
pub fn is_home(&self) -> bool {
do Local::borrow::<Scheduler,bool> |sched| {
match self.home {
Some(AnySched) => { false }
Some(Sched(SchedHandle { sched_id: ref id, _ })) => {
*id == sched.sched_id()
}
None => { rtabort!("task home of None") }
}
}
}
pub fn is_home_no_tls(&self, sched: &~Scheduler) -> bool {
match self.home {
Some(AnySched) => { false }
Some(Sched(SchedHandle { sched_id: ref id, _ })) => {
*id == sched.sched_id()
}
None => {rtabort!("task home of None") }
}
}
pub fn is_home_using_id(sched_id: uint) -> bool {
do Local::borrow::<Task,bool> |task| {
match task.home {
Some(Sched(SchedHandle { sched_id: ref id, _ })) => {
*id == sched_id
}
Some(AnySched) => { false }
None => { rtabort!("task home of None") }
}
}
}
/// Check if this *task* has a home.
pub fn homed(&self) -> bool {
match self.home {
Some(AnySched) => { false }
Some(Sched(_)) => { true }
None => {
rtabort!("task home of None")
}
}
}
/// On a special scheduler?
pub fn on_special() -> bool {
do Local::borrow::<Scheduler,bool> |sched| {
!sched.run_anything
}
}
}
impl Drop for Task {
fn drop(&self) { assert!(self.destroyed) }
}
// Coroutines represent nothing more than a context and a stack
// segment.
impl Coroutine {
pub fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine {
static MIN_STACK_SIZE: uint = 100000; // XXX: Too much stack
let start = Coroutine::build_start_wrapper(start);
let mut stack = stack_pool.take_segment(MIN_STACK_SIZE);
let initial_context = Context::new(start, &mut stack);
Coroutine {
current_stack_segment: stack,
saved_context: initial_context
}
}
fn build_start_wrapper(start: ~fn()) -> ~fn() {
let start_cell = Cell::new(start);
let wrapper: ~fn() = || {
// First code after swap to this new context. Run our
// cleanup job.
unsafe {
let sched = Local::unsafe_borrow::<Scheduler>();
(*sched).run_cleanup_job();
let sched = Local::unsafe_borrow::<Scheduler>();
let task = (*sched).current_task.get_mut_ref();
do task.run {
// N.B. Removing `start` from the start wrapper
// closure by emptying a cell is critical for
// correctness. The ~Task pointer, and in turn the
// closure used to initialize the first call
// frame, is destroyed in the scheduler context,
// not task context. So any captured closures must
// not contain user-definable dtors that expect to
// be in task context. By moving `start` out of
// the closure, all the user code goes our of
// scope while the task is still running.
let start = start_cell.take();
start();
};
}
let sched = Local::take::<Scheduler>();
sched.terminate_current_task();
};
return wrapper;
}
/// Destroy coroutine and try to reuse stack segment.
pub fn recycle(~self, stack_pool: &mut StackPool) {
match self {
~Coroutine { current_stack_segment, _ } => {
stack_pool.give_segment(current_stack_segment);
}
}
}
}
// Just a sanity check to make sure we are catching a Rust-thrown exception
static UNWIND_TOKEN: uintptr_t = 839147;
......@@ -184,8 +363,10 @@ fn key2(_x: @~str) { }
fn unwind() {
do run_in_newsched_task() {
let result = spawntask_try(||());
rtdebug!("trying first assert");
assert!(result.is_ok());
let result = spawntask_try(|| fail!());
rtdebug!("trying second assert");
assert!(result.is_err());
}
}
......@@ -227,4 +408,67 @@ fn comm_stream() {
assert!(port.recv() == 10);
}
}
#[test]
fn comm_shared_chan() {
use comm::*;
do run_in_newsched_task() {
let (port, chan) = stream();
let chan = SharedChan::new(chan);
chan.send(10);
assert!(port.recv() == 10);
}
}
#[test]
fn linked_failure() {
do run_in_newsched_task() {
let res = do spawntask_try {
spawntask_random(|| fail!());
};
assert!(res.is_err());
}
}
#[test]
fn heap_cycles() {
use option::{Option, Some, None};
do run_in_newsched_task {
struct List {
next: Option<@mut List>,
}
let a = @mut List { next: None };
let b = @mut List { next: Some(a) };
a.next = Some(b);
}
}
// XXX: This is a copy of test_future_result in std::task.
// It can be removed once the scheduler is turned on by default.
#[test]
fn future_result() {
do run_in_newsched_task {
use option::{Some, None};
use task::*;
let mut result = None;
let mut builder = task();
builder.future_result(|r| result = Some(r));
do builder.spawn {}
assert_eq!(result.unwrap().recv(), Success);
result = None;
let mut builder = task();
builder.future_result(|r| result = Some(r));
builder.unlinked();
do builder.spawn {
fail!();
}
assert_eq!(result.unwrap().recv(), Failure);
}
}
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -20,7 +20,6 @@
use clone::Clone;
use iterator::{FromIterator, Iterator, IteratorUtil};
use kinds::Copy;
use libc;
use libc::c_void;
use num::Zero;
use option::{None, Option, Some};
......@@ -33,17 +32,12 @@
use uint;
use unstable::intrinsics;
#[cfg(stage0)]
use intrinsic::{get_tydesc, TyDesc};
use intrinsic::{get_tydesc};
#[cfg(not(stage0))]
use unstable::intrinsics::{get_tydesc, contains_managed, TyDesc};
use unstable::intrinsics::{get_tydesc, contains_managed};
use vec;
use util;
extern {
#[fast_ffi]
unsafe fn vec_reserve_shared_actual(t: *TyDesc, v: **raw::VecRepr, n: libc::size_t);
}
/// Returns true if two vectors have the same length
pub fn same_length<T, U>(xs: &[T], ys: &[U]) -> bool {
xs.len() == ys.len()
......@@ -1139,7 +1133,9 @@ fn reserve(&mut self, n: uint) {
let td = get_tydesc::<T>();
if ((**ptr).box_header.ref_count ==
managed::raw::RC_MANAGED_UNIQUE) {
vec_reserve_shared_actual(td, ptr as **raw::VecRepr, n as libc::size_t);
// XXX transmute shouldn't be necessary
let td = cast::transmute(td);
::at_vec::raw::reserve_raw(td, ptr, n);
} else {
let alloc = n * sys::nonzero_size_of::<T>();
*ptr = realloc_raw(*ptr as *mut c_void, alloc + size_of::<raw::VecRepr>())
......@@ -1169,7 +1165,7 @@ fn reserve(&mut self, n: uint) {
let ptr: *mut *mut raw::VecRepr = cast::transmute(self);
let td = get_tydesc::<T>();
if contains_managed::<T>() {
vec_reserve_shared_actual(td, ptr as **raw::VecRepr, n as libc::size_t);
::at_vec::raw::reserve_raw(td, ptr, n);
} else {
let alloc = n * sys::nonzero_size_of::<T>();
let size = alloc + size_of::<raw::VecRepr>();
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册