Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
int
Rust
提交
d6133817
R
Rust
项目概览
int
/
Rust
11 个月 前同步成功
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rust
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
d6133817
编写于
11月 26, 2014
作者:
N
Niko Matsakis
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Rewrite threading infrastructure, introducing `Thunk` to represent
boxed `FnOnce` closures.
上级
10ac5b72
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
225 addition
and
124 deletion
+225
-124
src/librustrt/at_exit_imp.rs
src/librustrt/at_exit_imp.rs
+4
-3
src/librustrt/lib.rs
src/librustrt/lib.rs
+3
-2
src/librustrt/task.rs
src/librustrt/task.rs
+16
-9
src/librustrt/thread.rs
src/librustrt/thread.rs
+35
-23
src/librustrt/thunk.rs
src/librustrt/thunk.rs
+52
-0
src/libstd/lib.rs
src/libstd/lib.rs
+2
-0
src/libstd/rt/mod.rs
src/libstd/rt/mod.rs
+5
-4
src/libstd/sync/future.rs
src/libstd/sync/future.rs
+18
-13
src/libstd/sync/task_pool.rs
src/libstd/sync/task_pool.rs
+9
-7
src/libstd/sys/unix/process.rs
src/libstd/sys/unix/process.rs
+11
-5
src/libstd/task.rs
src/libstd/task.rs
+70
-58
未找到文件。
src/librustrt/at_exit_imp.rs
浏览文件 @
d6133817
...
...
@@ -18,10 +18,11 @@
use
collections
::
vec
::
Vec
;
use
core
::
atomic
;
use
core
::
mem
;
use
thunk
::{
Thunk
};
use
exclusive
::
Exclusive
;
type
Queue
=
Exclusive
<
Vec
<
proc
():
Send
>>
;
type
Queue
=
Exclusive
<
Vec
<
Thunk
>>
;
static
QUEUE
:
atomic
::
AtomicUint
=
atomic
::
INIT_ATOMIC_UINT
;
static
RUNNING
:
atomic
::
AtomicBool
=
atomic
::
INIT_ATOMIC_BOOL
;
...
...
@@ -34,7 +35,7 @@ pub fn init() {
}
}
pub
fn
push
(
f
:
proc
():
Send
)
{
pub
fn
push
(
f
:
Thunk
)
{
unsafe
{
// Note that the check against 0 for the queue pointer is not atomic at
// all with respect to `run`, meaning that this could theoretically be a
...
...
@@ -59,6 +60,6 @@ pub fn run() {
};
for
to_run
in
cur
.into_iter
()
{
to_run
(
);
to_run
.invoke
(()
);
}
}
src/librustrt/lib.rs
浏览文件 @
d6133817
...
...
@@ -46,6 +46,7 @@
mod
util
;
mod
libunwind
;
mod
stack_overflow
;
pub
mod
thunk
;
pub
mod
args
;
pub
mod
bookkeeping
;
...
...
@@ -95,8 +96,8 @@ pub fn init(argc: int, argv: *const *const u8) {
///
/// It is forbidden for procedures to register more `at_exit` handlers when they
/// are running, and doing so will lead to a process abort.
pub
fn
at_exit
(
f
:
proc
():
Send
)
{
at_exit_imp
::
push
(
f
);
pub
fn
at_exit
<
F
:
FnOnce
()
+
Send
>
(
f
:
F
)
{
at_exit_imp
::
push
(
thunk
::
Thunk
::
new
(
f
)
);
}
/// One-time runtime cleanup.
...
...
src/librustrt/task.rs
浏览文件 @
d6133817
...
...
@@ -21,6 +21,7 @@
use
core
::
atomic
::{
AtomicUint
,
SeqCst
};
use
core
::
iter
::{
IteratorExt
,
Take
};
use
core
::
kinds
::
marker
;
use
core
::
ops
::
FnOnce
;
use
core
::
mem
;
use
core
::
ops
::
FnMut
;
use
core
::
prelude
::{
Clone
,
Drop
,
Err
,
Iterator
,
None
,
Ok
,
Option
,
Send
,
Some
};
...
...
@@ -34,6 +35,7 @@
use
unwind
;
use
unwind
::
Unwinder
;
use
collections
::
str
::
SendStr
;
use
thunk
::
Thunk
;
/// State associated with Rust tasks.
///
...
...
@@ -67,7 +69,7 @@ enum TaskState {
pub
struct
TaskOpts
{
/// Invoke this procedure with the result of the task when it finishes.
pub
on_exit
:
Option
<
proc
(
Result
):
Send
>
,
pub
on_exit
:
Option
<
Thunk
<
Result
>
>
,
/// A name for the task-to-be, for identification in panic messages
pub
name
:
Option
<
SendStr
>
,
/// The size of the stack for the spawned task
...
...
@@ -92,7 +94,7 @@ pub enum BlockedTask {
/// Per-task state related to task death, killing, panic, etc.
pub
struct
Death
{
pub
on_exit
:
Option
<
proc
(
Result
):
Send
>
,
pub
on_exit
:
Option
<
Thunk
<
Result
>
>
,
marker
:
marker
::
NoCopy
,
}
...
...
@@ -116,7 +118,13 @@ pub fn new(stack_bounds: Option<(uint, uint)>, stack_guard: Option<uint>) -> Tas
}
}
pub
fn
spawn
(
opts
:
TaskOpts
,
f
:
proc
():
Send
)
{
pub
fn
spawn
<
F
>
(
opts
:
TaskOpts
,
f
:
F
)
where
F
:
FnOnce
(),
F
:
Send
{
Task
::
spawn_thunk
(
opts
,
Thunk
::
new
(
f
))
}
fn
spawn_thunk
(
opts
:
TaskOpts
,
f
:
Thunk
)
{
let
TaskOpts
{
name
,
stack_size
,
on_exit
}
=
opts
;
let
mut
task
=
box
Task
::
new
(
None
,
None
);
...
...
@@ -138,7 +146,7 @@ pub fn spawn(opts: TaskOpts, f: proc():Send) {
// because by the time that this function is executing we've already
// consumed at least a little bit of stack (we don't know the exact byte
// address at which our stack started).
Thread
::
spawn_stack
(
stack
,
proc
()
{
Thread
::
spawn_stack
(
stack
,
move
||
{
let
something_around_the_top_of_the_stack
=
1
;
let
addr
=
&
something_around_the_top_of_the_stack
as
*
const
int
;
let
my_stack
=
addr
as
uint
;
...
...
@@ -150,7 +158,7 @@ pub fn spawn(opts: TaskOpts, f: proc():Send) {
task
.stack_bounds
=
(
my_stack
-
stack
+
1024
,
my_stack
);
let
mut
f
=
Some
(
f
);
drop
(
task
.run
(||
{
f
.take
()
.unwrap
()
(
)
})
.destroy
());
drop
(
task
.run
(||
{
f
.take
()
.unwrap
()
.invoke
(()
)
})
.destroy
());
drop
(
token
);
})
}
...
...
@@ -241,7 +249,7 @@ fn cleanup(mut self: Box<Task>, result: Result) -> Box<Task> {
// reconsideration to whether it's a reasonable thing to let a
// task to do or not.
match
what_to_do
{
Some
(
f
)
=>
{
f
(
result
)
}
Some
(
f
)
=>
{
f
.invoke
(
result
)
}
None
=>
{
drop
(
result
)
}
}
...
...
@@ -500,14 +508,13 @@ mod test {
use
super
::
*
;
use
std
::
prelude
::
*
;
use
std
::
task
;
use
unwind
;
#[test]
fn
unwind
()
{
let
result
=
task
::
try
(
proc
()
());
let
result
=
task
::
try
(
move
||
());
rtdebug!
(
"trying first assert"
);
assert
!
(
result
.is_ok
());
let
result
=
task
::
try
::
<
()
>
(
proc
()
panic!
());
let
result
=
task
::
try
(
move
||
->
()
panic!
());
rtdebug!
(
"trying second assert"
);
assert
!
(
result
.is_err
());
}
...
...
src/librustrt/thread.rs
浏览文件 @
d6133817
...
...
@@ -22,6 +22,7 @@
use
core
::
mem
;
use
core
::
uint
;
use
libc
;
use
thunk
::{
Thunk
};
use
stack
;
use
stack_overflow
;
...
...
@@ -60,8 +61,8 @@ fn start_thread(main: *mut libc::c_void) -> imp::rust_thread_return {
unsafe
{
stack
::
record_os_managed_stack_bounds
(
0
,
uint
::
MAX
);
let
handler
=
stack_overflow
::
Handler
::
new
();
let
f
:
Box
<
proc
()
>
=
mem
::
transmute
(
main
);
(
*
f
)(
);
let
f
:
Box
<
Thunk
>
=
mem
::
transmute
(
main
);
f
.invoke
(()
);
drop
(
handler
);
mem
::
transmute
(
0
as
imp
::
rust_thread_return
)
}
...
...
@@ -113,14 +114,17 @@ impl Thread<()> {
/// to finish executing. This means that even if `join` is not explicitly
/// called, when the `Thread` falls out of scope its destructor will block
/// waiting for the OS thread.
pub
fn
start
<
T
:
Send
>
(
main
:
proc
():
Send
->
T
)
->
Thread
<
T
>
{
pub
fn
start
<
T
,
F
>
(
main
:
F
)
->
Thread
<
T
>
where
T
:
Send
,
F
:
FnOnce
()
->
T
,
F
:
Send
{
Thread
::
start_stack
(
DEFAULT_STACK_SIZE
,
main
)
}
/// Performs the same functionality as `start`, but specifies an explicit
/// stack size for the new thread.
pub
fn
start_stack
<
T
:
Send
>
(
stack
:
uint
,
main
:
proc
():
Send
->
T
)
->
Thread
<
T
>
{
pub
fn
start_stack
<
T
,
F
>
(
stack
:
uint
,
main
:
F
)
->
Thread
<
T
>
where
T
:
Send
,
F
:
FnOnce
()
->
T
,
F
:
Send
{
// We need the address of the packet to fill in to be stable so when
// `main` fills it in it's still valid, so allocate an extra box to do
// so.
...
...
@@ -128,8 +132,11 @@ pub fn start_stack<T: Send>(stack: uint, main: proc():Send -> T) -> Thread<T> {
let
packet2
:
*
mut
Option
<
T
>
=
unsafe
{
*
mem
::
transmute
::
<&
Box
<
Option
<
T
>>
,
*
const
*
mut
Option
<
T
>>
(
&
packet
)
};
let
main
=
proc
()
unsafe
{
*
packet2
=
Some
(
main
());
};
let
native
=
unsafe
{
imp
::
create
(
stack
,
box
main
)
};
let
native
=
unsafe
{
imp
::
create
(
stack
,
Thunk
::
new
(
move
|:|
{
*
packet2
=
Some
(
main
.call_once
(()));
}))
};
Thread
{
native
:
native
,
...
...
@@ -144,15 +151,19 @@ pub fn start_stack<T: Send>(stack: uint, main: proc():Send -> T) -> Thread<T> {
/// This corresponds to creating threads in the 'detached' state on unix
/// systems. Note that platforms may not keep the main program alive even if
/// there are detached thread still running around.
pub
fn
spawn
(
main
:
proc
():
Send
)
{
pub
fn
spawn
<
F
>
(
main
:
F
)
where
F
:
FnOnce
()
+
Send
{
Thread
::
spawn_stack
(
DEFAULT_STACK_SIZE
,
main
)
}
/// Performs the same functionality as `spawn`, but explicitly specifies a
/// stack size for the new thread.
pub
fn
spawn_stack
(
stack
:
uint
,
main
:
proc
():
Send
)
{
pub
fn
spawn_stack
<
F
>
(
stack
:
uint
,
main
:
F
)
where
F
:
FnOnce
()
+
Send
{
unsafe
{
let
handle
=
imp
::
create
(
stack
,
box
main
);
let
handle
=
imp
::
create
(
stack
,
Thunk
::
new
(
main
)
);
imp
::
detach
(
handle
);
}
}
...
...
@@ -190,8 +201,6 @@ fn drop(&mut self) {
#[cfg(windows)]
#[allow(non_snake_case)]
mod
imp
{
use
core
::
prelude
::
*
;
use
alloc
::
boxed
::
Box
;
use
core
::
cmp
;
use
core
::
mem
;
...
...
@@ -200,6 +209,7 @@ mod imp {
use
libc
::
types
::
os
::
arch
::
extra
::{
LPSECURITY_ATTRIBUTES
,
SIZE_T
,
BOOL
,
LPVOID
,
DWORD
,
LPDWORD
,
HANDLE
};
use
stack
::
RED_ZONE
;
use
thunk
::
Thunk
;
pub
type
rust_thread
=
HANDLE
;
pub
type
rust_thread_return
=
DWORD
;
...
...
@@ -217,8 +227,9 @@ pub unsafe fn init() {
}
}
pub
unsafe
fn
create
(
stack
:
uint
,
p
:
Box
<
proc
():
Send
>
)
->
rust_thread
{
let
arg
:
*
mut
libc
::
c_void
=
mem
::
transmute
(
p
);
pub
unsafe
fn
create
(
stack
:
uint
,
p
:
Thunk
)
->
rust_thread
{
let
arg
:
*
mut
libc
::
c_void
=
mem
::
transmute
(
box
p
);
// FIXME On UNIX, we guard against stack sizes that are too small but
// that's because pthreads enforces that stacks are at least
// PTHREAD_STACK_MIN bytes big. Windows has no such lower limit, it's
...
...
@@ -234,7 +245,7 @@ pub unsafe fn create(stack: uint, p: Box<proc():Send>) -> rust_thread {
if
ret
as
uint
==
0
{
// be sure to not leak the closure
let
_
p
:
Box
<
proc
():
Send
>
=
mem
::
transmute
(
arg
);
let
_
p
:
Box
<
Thunk
>
=
mem
::
transmute
(
arg
);
panic!
(
"failed to spawn native thread: {}"
,
ret
);
}
return
ret
;
...
...
@@ -279,6 +290,7 @@ mod imp {
use
core
::
ptr
;
use
libc
::
consts
::
os
::
posix01
::{
PTHREAD_CREATE_JOINABLE
,
PTHREAD_STACK_MIN
};
use
libc
;
use
thunk
::
Thunk
;
use
stack
::
RED_ZONE
;
...
...
@@ -409,7 +421,7 @@ pub unsafe fn current() -> uint {
}
}
pub
unsafe
fn
create
(
stack
:
uint
,
p
:
Box
<
proc
():
Send
>
)
->
rust_thread
{
pub
unsafe
fn
create
(
stack
:
uint
,
p
:
Thunk
)
->
rust_thread
{
let
mut
native
:
libc
::
pthread_t
=
mem
::
zeroed
();
let
mut
attr
:
libc
::
pthread_attr_t
=
mem
::
zeroed
();
assert_eq!
(
pthread_attr_init
(
&
mut
attr
),
0
);
...
...
@@ -437,13 +449,13 @@ pub unsafe fn create(stack: uint, p: Box<proc():Send>) -> rust_thread {
},
};
let
arg
:
*
mut
libc
::
c_void
=
mem
::
transmute
(
p
);
let
arg
:
*
mut
libc
::
c_void
=
mem
::
transmute
(
box
p
);
// must box since sizeof(p)=2*uint
let
ret
=
pthread_create
(
&
mut
native
,
&
attr
,
super
::
thread_start
,
arg
);
assert_eq!
(
pthread_attr_destroy
(
&
mut
attr
),
0
);
if
ret
!=
0
{
// be sure to not leak the closure
let
_
p
:
Box
<
proc
():
Send
>
=
mem
::
transmute
(
arg
);
let
_
p
:
Box
<
Box
<
FnOnce
()
+
Send
>
>
=
mem
::
transmute
(
arg
);
panic!
(
"failed to spawn native thread: {}"
,
ret
);
}
native
...
...
@@ -531,17 +543,17 @@ mod tests {
use
super
::
Thread
;
#[test]
fn
smoke
()
{
Thread
::
start
(
proc
()
{})
.join
();
}
fn
smoke
()
{
Thread
::
start
(
move
||
{})
.join
();
}
#[test]
fn
data
()
{
assert_eq!
(
Thread
::
start
(
proc
()
{
1
i
})
.join
(),
1
);
}
fn
data
()
{
assert_eq!
(
Thread
::
start
(
move
||
{
1
i
})
.join
(),
1
);
}
#[test]
fn
detached
()
{
Thread
::
spawn
(
proc
()
{})
}
fn
detached
()
{
Thread
::
spawn
(
move
||
{})
}
#[test]
fn
small_stacks
()
{
assert_eq!
(
42
i
,
Thread
::
start_stack
(
0
,
proc
()
42
i
)
.join
());
assert_eq!
(
42
i
,
Thread
::
start_stack
(
1
,
proc
()
42
i
)
.join
());
assert_eq!
(
42
i
,
Thread
::
start_stack
(
0
,
move
||
42
i
)
.join
());
assert_eq!
(
42
i
,
Thread
::
start_stack
(
1
,
move
||
42
i
)
.join
());
}
}
src/librustrt/thunk.rs
0 → 100644
浏览文件 @
d6133817
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use
alloc
::
boxed
::
Box
;
use
core
::
kinds
::
Send
;
use
core
::
ops
::
FnOnce
;
pub
struct
Thunk
<
A
=
(),
R
=
()
>
{
invoke
:
Box
<
Invoke
<
A
,
R
>+
Send
>
}
impl
<
R
>
Thunk
<
(),
R
>
{
pub
fn
new
<
F
>
(
func
:
F
)
->
Thunk
<
(),
R
>
where
F
:
FnOnce
()
->
R
,
F
:
Send
{
Thunk
::
with_arg
(
move
|:
()|
func
())
}
}
impl
<
A
,
R
>
Thunk
<
A
,
R
>
{
pub
fn
with_arg
<
F
>
(
func
:
F
)
->
Thunk
<
A
,
R
>
where
F
:
FnOnce
(
A
)
->
R
,
F
:
Send
{
Thunk
{
invoke
:
box
func
}
}
pub
fn
invoke
(
self
,
arg
:
A
)
->
R
{
self
.invoke
.invoke
(
arg
)
}
}
pub
trait
Invoke
<
A
=
(),
R
=
()
>
{
fn
invoke
(
self
:
Box
<
Self
>
,
arg
:
A
)
->
R
;
}
impl
<
A
,
R
,
F
>
Invoke
<
A
,
R
>
for
F
where
F
:
FnOnce
(
A
)
->
R
{
fn
invoke
(
self
:
Box
<
F
>
,
arg
:
A
)
->
R
{
let
f
=
*
self
;
f
(
arg
)
}
}
src/libstd/lib.rs
浏览文件 @
d6133817
...
...
@@ -171,6 +171,8 @@
pub
use
unicode
::
char
;
pub
use
rustrt
::
thunk
;
/* Exported macros */
pub
mod
macros
;
...
...
src/libstd/rt/mod.rs
浏览文件 @
d6133817
...
...
@@ -52,6 +52,7 @@
use
failure
;
use
rustrt
;
use
os
;
use
thunk
::
Thunk
;
// Reexport some of our utilities which are expected by other crates.
pub
use
self
::
util
::{
default_sched_threads
,
min_stack
,
running_on_valgrind
};
...
...
@@ -87,10 +88,10 @@ pub fn init(argc: int, argv: *const *const u8) {
#[lang
=
"start"
]
fn
lang_start
(
main
:
*
const
u8
,
argc
:
int
,
argv
:
*
const
*
const
u8
)
->
int
{
use
mem
;
start
(
argc
,
argv
,
proc
()
{
start
(
argc
,
argv
,
Thunk
::
new
(
move
||
{
let
main
:
extern
"Rust"
fn
()
=
unsafe
{
mem
::
transmute
(
main
)
};
main
();
})
})
)
}
/// Executes the given procedure after initializing the runtime with the given
...
...
@@ -102,7 +103,7 @@ fn lang_start(main: *const u8, argc: int, argv: *const *const u8) -> int {
///
/// This function will only return once *all* native threads in the system have
/// exited.
pub
fn
start
(
argc
:
int
,
argv
:
*
const
*
const
u8
,
main
:
proc
()
)
->
int
{
pub
fn
start
(
argc
:
int
,
argv
:
*
const
*
const
u8
,
main
:
Thunk
)
->
int
{
use
prelude
::
*
;
use
rt
;
use
rustrt
::
task
::
Task
;
...
...
@@ -144,7 +145,7 @@ pub fn start(argc: int, argv: *const *const u8, main: proc()) -> int {
unsafe
{
rustrt
::
stack
::
record_os_managed_stack_bounds
(
my_stack_bottom
,
my_stack_top
);
}
(
main
.take
()
.unwrap
())
(
);
(
main
.take
()
.unwrap
())
.invoke
(()
);
exit_code
=
Some
(
os
::
get_exit_status
());
})
.destroy
());
unsafe
{
rt
::
cleanup
();
}
...
...
src/libstd/sync/future.rs
浏览文件 @
d6133817
...
...
@@ -17,7 +17,7 @@
//! use std::sync::Future;
//! # fn fib(n: uint) -> uint {42};
//! # fn make_a_sandwich() {};
//! let mut delayed_fib = Future::spawn(
proc()
{ fib(5000) });
//! let mut delayed_fib = Future::spawn(
move||
{ fib(5000) });
//! make_a_sandwich();
//! println!("fib(5000) = {}", delayed_fib.get())
//! ```
...
...
@@ -30,6 +30,7 @@
use
self
::
FutureState
::
*
;
use
comm
::{
Receiver
,
channel
};
use
task
::
spawn
;
use
thunk
::{
Thunk
};
/// A type encapsulating the result of a computation which may not be complete
pub
struct
Future
<
A
>
{
...
...
@@ -37,7 +38,7 @@ pub struct Future<A> {
}
enum
FutureState
<
A
>
{
Pending
(
proc
():
Send
->
A
),
Pending
(
Thunk
<
(),
A
>
),
Evaluating
,
Forced
(
A
)
}
...
...
@@ -78,7 +79,7 @@ pub fn get_ref<'a>(&'a mut self) -> &'a A {
match
replace
(
&
mut
self
.state
,
Evaluating
)
{
Forced
(
_
)
|
Evaluating
=>
panic!
(
"Logic error."
),
Pending
(
f
)
=>
{
self
.state
=
Forced
(
f
(
));
self
.state
=
Forced
(
f
.invoke
(()
));
self
.get_ref
()
}
}
...
...
@@ -97,7 +98,9 @@ pub fn from_value(val: A) -> Future<A> {
Future
{
state
:
Forced
(
val
)}
}
pub
fn
from_fn
(
f
:
proc
():
Send
->
A
)
->
Future
<
A
>
{
pub
fn
from_fn
<
F
>
(
f
:
F
)
->
Future
<
A
>
where
F
:
FnOnce
()
->
A
,
F
:
Send
{
/*!
* Create a future from a function.
*
...
...
@@ -106,7 +109,7 @@ pub fn from_fn(f: proc():Send -> A) -> Future<A> {
* function. It is not spawned into another task.
*/
Future
{
state
:
Pending
(
f
)}
Future
{
state
:
Pending
(
Thunk
::
new
(
f
)
)}
}
}
...
...
@@ -119,12 +122,14 @@ pub fn from_receiver(rx: Receiver<A>) -> Future<A> {
* waiting for the result to be received on the port.
*/
Future
::
from_fn
(
proc
()
{
Future
::
from_fn
(
move
|:|
{
rx
.recv
()
})
}
pub
fn
spawn
(
blk
:
proc
():
Send
->
A
)
->
Future
<
A
>
{
pub
fn
spawn
<
F
>
(
blk
:
F
)
->
Future
<
A
>
where
F
:
FnOnce
()
->
A
,
F
:
Send
{
/*!
* Create a future from a unique closure.
*
...
...
@@ -134,7 +139,7 @@ pub fn spawn(blk: proc():Send -> A) -> Future<A> {
let
(
tx
,
rx
)
=
channel
();
spawn
(
proc
()
{
spawn
(
move
|:|
{
// Don't panic if the other end has hung up
let
_
=
tx
.send_opt
(
blk
());
});
...
...
@@ -166,7 +171,7 @@ fn test_from_receiver() {
#[test]
fn
test_from_fn
()
{
let
mut
f
=
Future
::
from_fn
(
proc
()
"brail"
.to_string
());
let
mut
f
=
Future
::
from_fn
(
move
||
"brail"
.to_string
());
assert_eq!
(
f
.get
(),
"brail"
);
}
...
...
@@ -190,14 +195,14 @@ fn test_get_ref_method() {
#[test]
fn
test_spawn
()
{
let
mut
f
=
Future
::
spawn
(
proc
()
"bale"
.to_string
());
let
mut
f
=
Future
::
spawn
(
move
||
"bale"
.to_string
());
assert_eq!
(
f
.get
(),
"bale"
);
}
#[test]
#[should_fail]
fn
test_future_panic
()
{
let
mut
f
=
Future
::
spawn
(
proc
()
panic!
());
let
mut
f
=
Future
::
spawn
(
move
||
panic!
());
let
_
x
:
String
=
f
.get
();
}
...
...
@@ -205,8 +210,8 @@ fn test_future_panic() {
fn
test_sendable_future
()
{
let
expected
=
"schlorf"
;
let
(
tx
,
rx
)
=
channel
();
let
f
=
Future
::
spawn
(
proc
()
{
expected
});
task
::
spawn
(
proc
()
{
let
f
=
Future
::
spawn
(
move
||
{
expected
});
task
::
spawn
(
move
||
{
let
mut
f
=
f
;
tx
.send
(
f
.get
());
});
...
...
src/libstd/sync/task_pool.rs
浏览文件 @
d6133817
...
...
@@ -72,7 +72,7 @@ pub struct TaskPool {
//
// This is the only such Sender, so when it is dropped all subtasks will
// quit.
jobs
:
Sender
<
proc
():
Send
>
jobs
:
Sender
<
Thunk
>
}
impl
TaskPool
{
...
...
@@ -84,7 +84,7 @@ impl TaskPool {
pub
fn
new
(
tasks
:
uint
)
->
TaskPool
{
assert
!
(
tasks
>=
1
);
let
(
tx
,
rx
)
=
channel
::
<
proc
():
Send
>
();
let
(
tx
,
rx
)
=
channel
::
<
Thunk
>
();
let
rx
=
Arc
::
new
(
Mutex
::
new
(
rx
));
// Taskpool tasks.
...
...
@@ -96,13 +96,15 @@ pub fn new(tasks: uint) -> TaskPool {
}
/// Executes the function `job` on a task in the pool.
pub
fn
execute
(
&
self
,
job
:
proc
():
Send
)
{
self
.jobs
.send
(
job
);
pub
fn
execute
<
F
>
(
&
self
,
job
:
F
)
where
F
:
FnOnce
(),
F
:
Send
{
self
.jobs
.send
(
Thunk
::
new
(
job
));
}
}
fn
spawn_in_pool
(
jobs
:
Arc
<
Mutex
<
Receiver
<
proc
():
Send
>>>
)
{
spawn
(
proc
()
{
fn
spawn_in_pool
(
jobs
:
Arc
<
Mutex
<
Receiver
<
Thunk
>>>
)
{
spawn
(
move
|:|
{
// Will spawn a new task on panic unless it is cancelled.
let
sentinel
=
Sentinel
::
new
(
&
jobs
);
...
...
@@ -115,7 +117,7 @@ fn spawn_in_pool(jobs: Arc<Mutex<Receiver<proc(): Send>>>) {
};
match
message
{
Ok
(
job
)
=>
job
(
),
Ok
(
job
)
=>
job
.invoke
(()
),
// The Taskpool was dropped.
Err
(
..
)
=>
break
...
...
src/libstd/sys/unix/process.rs
浏览文件 @
d6133817
...
...
@@ -531,8 +531,11 @@ pub fn try_wait(&self) -> Option<ProcessExit> {
}
}
fn
with_argv
<
T
>
(
prog
:
&
CString
,
args
:
&
[
CString
],
cb
:
proc
(
*
const
*
const
libc
::
c_char
)
->
T
)
->
T
{
fn
with_argv
<
T
,
F
>
(
prog
:
&
CString
,
args
:
&
[
CString
],
cb
:
F
)
->
T
where
F
:
FnOnce
(
*
const
*
const
libc
::
c_char
)
->
T
{
let
mut
ptrs
:
Vec
<*
const
libc
::
c_char
>
=
Vec
::
with_capacity
(
args
.len
()
+
1
);
// Convert the CStrings into an array of pointers. Note: the
...
...
@@ -549,9 +552,12 @@ fn with_argv<T>(prog: &CString, args: &[CString],
cb
(
ptrs
.as_ptr
())
}
fn
with_envp
<
K
,
V
,
T
>
(
env
:
Option
<&
collections
::
HashMap
<
K
,
V
>>
,
cb
:
proc
(
*
const
c_void
)
->
T
)
->
T
where
K
:
BytesContainer
+
Eq
+
Hash
,
V
:
BytesContainer
fn
with_envp
<
K
,
V
,
T
,
F
>
(
env
:
Option
<&
collections
::
HashMap
<
K
,
V
>>
,
cb
:
F
)
->
T
where
F
:
FnOnce
(
*
const
c_void
)
->
T
,
K
:
BytesContainer
+
Eq
+
Hash
,
V
:
BytesContainer
{
// On posixy systems we can pass a char** for envp, which is a
// null-terminated array of "k=v\0" strings. Since we must create
...
...
src/libstd/task.rs
浏览文件 @
d6133817
...
...
@@ -35,7 +35,7 @@
//! ## Example
//!
//! ```rust
//! spawn(
proc()
{
//! spawn(
move||
{
//! println!("Hello, World!");
//! })
//! ```
...
...
@@ -47,6 +47,7 @@
use
borrow
::
IntoCow
;
use
boxed
::
Box
;
use
comm
::
channel
;
use
core
::
ops
::
FnOnce
;
use
io
::{
Writer
,
stdio
};
use
kinds
::{
Send
,
marker
};
use
option
::
Option
;
...
...
@@ -57,6 +58,7 @@
use
rustrt
::
task
;
use
str
::
SendStr
;
use
string
::{
String
,
ToString
};
use
thunk
::{
Thunk
};
use
sync
::
Future
;
/// The task builder type.
...
...
@@ -80,7 +82,7 @@ pub struct TaskBuilder {
// Task-local stderr
stderr
:
Option
<
Box
<
Writer
+
Send
>>
,
// Optionally wrap the eventual task body
gen_body
:
Option
<
proc
(
v
:
proc
():
Send
):
Send
->
proc
():
Send
>
,
gen_body
:
Option
<
Thunk
<
Thunk
,
Thunk
>
>
,
nocopy
:
marker
::
NoCopy
,
}
...
...
@@ -129,41 +131,46 @@ pub fn stderr(mut self, stderr: Box<Writer + Send>) -> TaskBuilder {
}
// Where spawning actually happens (whether yielding a future or not)
fn
spawn_internal
(
self
,
f
:
proc
():
Send
,
on_exit
:
Option
<
proc
(
Result
<
(),
Box
<
Any
+
Send
>>
):
Send
>
)
{
fn
spawn_internal
(
self
,
f
:
Thunk
,
on_exit
:
Option
<
Thunk
<
task
::
Result
>>
)
{
let
TaskBuilder
{
name
,
stack_size
,
stdout
,
stderr
,
mut
gen_body
,
nocopy
:
_
}
=
self
;
let
f
=
match
gen_body
.take
()
{
Some
(
gen
)
=>
gen
(
f
),
Some
(
gen
)
=>
gen
.invoke
(
f
),
None
=>
f
};
let
opts
=
task
::
TaskOpts
{
on_exit
:
on_exit
,
name
:
name
,
stack_size
:
stack_size
,
};
if
stdout
.is_some
()
||
stderr
.is_some
()
{
Task
::
spawn
(
opts
,
proc
()
{
Task
::
spawn
(
opts
,
move
|:|
{
let
_
=
stdout
.map
(
stdio
::
set_stdout
);
let
_
=
stderr
.map
(
stdio
::
set_stderr
);
f
(
);
})
f
.invoke
(()
);
})
;
}
else
{
Task
::
spawn
(
opts
,
f
)
Task
::
spawn
(
opts
,
move
|:|
f
.invoke
(())
)
}
}
/// Creates and executes a new child task.
///
/// Sets up a new task with its own call stack and schedules it to run
/// the provided
proc
. The task has the properties and behavior
/// the provided
function
. The task has the properties and behavior
/// specified by the `TaskBuilder`.
pub
fn
spawn
(
self
,
f
:
proc
():
Send
)
{
self
.spawn_internal
(
f
,
None
)
pub
fn
spawn
<
F
:
FnOnce
()
+
Send
>
(
self
,
f
:
F
)
{
self
.spawn_internal
(
Thunk
::
new
(
f
)
,
None
)
}
/// Execute a
proc
in a newly-spawned task and return a future representing
/// Execute a
function
in a newly-spawned task and return a future representing
/// the task's result. The task has the properties and behavior
/// specified by the `TaskBuilder`.
///
...
...
@@ -178,20 +185,22 @@ pub fn spawn(self, f: proc():Send) {
/// `result::Result::Err` containing the argument to `panic!(...)` as an
/// `Any` trait object.
#[experimental
=
"Futures are experimental."
]
pub
fn
try_future
<
T
:
Send
>
(
self
,
f
:
proc
():
Send
->
T
)
->
Future
<
Result
<
T
,
Box
<
Any
+
Send
>>>
{
// currently, the on_exit
proc
provided by librustrt only works for unit
pub
fn
try_future
<
T
:
Send
,
F
:
FnOnce
()
->
(
T
)
+
Send
>
(
self
,
f
:
F
)
->
Future
<
Result
<
T
,
Box
<
Any
+
Send
>>>
{
// currently, the on_exit
fn
provided by librustrt only works for unit
// results, so we use an additional side-channel to communicate the
// result.
let
(
tx_done
,
rx_done
)
=
channel
();
// signal that task has exited
let
(
tx_retv
,
rx_retv
)
=
channel
();
// return value from task
let
on_exit
=
proc
(
res
)
{
let
_
=
tx_done
.send_opt
(
res
);
};
self
.spawn_internal
(
proc
()
{
let
_
=
tx_retv
.send_opt
(
f
());
},
let
on_exit
:
Thunk
<
task
::
Result
>
=
Thunk
::
with_arg
(
move
|:
res
:
task
::
Result
|
{
let
_
=
tx_done
.send_opt
(
res
);
});
self
.spawn_internal
(
Thunk
::
new
(
move
|:|
{
let
_
=
tx_retv
.send_opt
(
f
());
}),
Some
(
on_exit
));
Future
::
from_fn
(
proc
()
{
Future
::
from_fn
(
move
|:|
{
rx_done
.recv
()
.map
(|
_
|
rx_retv
.recv
())
})
}
...
...
@@ -199,7 +208,9 @@ pub fn try_future<T:Send>(self, f: proc():Send -> T)
/// Execute a function in a newly-spawnedtask and block until the task
/// completes or panics. Equivalent to `.try_future(f).unwrap()`.
#[unstable
=
"Error type may change."
]
pub
fn
try
<
T
:
Send
>
(
self
,
f
:
proc
():
Send
->
T
)
->
Result
<
T
,
Box
<
Any
+
Send
>>
{
pub
fn
try
<
T
,
F
>
(
self
,
f
:
F
)
->
Result
<
T
,
Box
<
Any
+
Send
>>
where
F
:
FnOnce
()
->
T
,
F
:
Send
,
T
:
Send
{
self
.try_future
(
f
)
.into_inner
()
}
}
...
...
@@ -212,7 +223,7 @@ pub fn try<T:Send>(self, f: proc():Send -> T) -> Result<T, Box<Any + Send>> {
/// the provided unique closure.
///
/// This function is equivalent to `TaskBuilder::new().spawn(f)`.
pub
fn
spawn
(
f
:
proc
():
Send
)
{
pub
fn
spawn
<
F
:
FnOnce
()
+
Send
>
(
f
:
F
)
{
TaskBuilder
::
new
()
.spawn
(
f
)
}
...
...
@@ -221,7 +232,9 @@ pub fn spawn(f: proc(): Send) {
///
/// This is equivalent to `TaskBuilder::new().try`.
#[unstable
=
"Error type may change."
]
pub
fn
try
<
T
:
Send
>
(
f
:
proc
():
Send
->
T
)
->
Result
<
T
,
Box
<
Any
+
Send
>>
{
pub
fn
try
<
T
,
F
>
(
f
:
F
)
->
Result
<
T
,
Box
<
Any
+
Send
>>
where
T
:
Send
,
F
:
FnOnce
()
->
T
,
F
:
Send
{
TaskBuilder
::
new
()
.try
(
f
)
}
...
...
@@ -230,11 +243,12 @@ pub fn try<T: Send>(f: proc(): Send -> T) -> Result<T, Box<Any + Send>> {
///
/// This is equivalent to `TaskBuilder::new().try_future`.
#[experimental
=
"Futures are experimental."
]
pub
fn
try_future
<
T
:
Send
>
(
f
:
proc
():
Send
->
T
)
->
Future
<
Result
<
T
,
Box
<
Any
+
Send
>>>
{
pub
fn
try_future
<
T
,
F
>
(
f
:
F
)
->
Future
<
Result
<
T
,
Box
<
Any
+
Send
>>>
where
T
:
Send
,
F
:
FnOnce
()
->
T
,
F
:
Send
{
TaskBuilder
::
new
()
.try_future
(
f
)
}
/* Lifecycle functions */
/// Read the name of the current task.
...
...
@@ -274,6 +288,8 @@ mod test {
use
result
;
use
std
::
io
::{
ChanReader
,
ChanWriter
};
use
string
::
String
;
use
thunk
::
Thunk
;
use
prelude
::
*
;
use
super
::
*
;
// !!! These tests are dangerous. If something is buggy, they will hang, !!!
...
...
@@ -281,28 +297,28 @@ mod test {
#[test]
fn
test_unnamed_task
()
{
try
(
proc
()
{
try
(
move
||
{
assert
!
(
name
()
.is_none
());
})
.map_err
(|
_
|
())
.unwrap
();
}
#[test]
fn
test_owned_named_task
()
{
TaskBuilder
::
new
()
.named
(
"ada lovelace"
.to_string
())
.try
(
proc
()
{
TaskBuilder
::
new
()
.named
(
"ada lovelace"
.to_string
())
.try
(
move
||
{
assert
!
(
name
()
.unwrap
()
==
"ada lovelace"
);
})
.map_err
(|
_
|
())
.unwrap
();
}
#[test]
fn
test_static_named_task
()
{
TaskBuilder
::
new
()
.named
(
"ada lovelace"
)
.try
(
proc
()
{
TaskBuilder
::
new
()
.named
(
"ada lovelace"
)
.try
(
move
||
{
assert
!
(
name
()
.unwrap
()
==
"ada lovelace"
);
})
.map_err
(|
_
|
())
.unwrap
();
}
#[test]
fn
test_send_named_task
()
{
TaskBuilder
::
new
()
.named
(
"ada lovelace"
.into_cow
())
.try
(
proc
()
{
TaskBuilder
::
new
()
.named
(
"ada lovelace"
.into_cow
())
.try
(
move
||
{
assert
!
(
name
()
.unwrap
()
==
"ada lovelace"
);
})
.map_err
(|
_
|
())
.unwrap
();
}
...
...
@@ -310,7 +326,7 @@ fn test_send_named_task() {
#[test]
fn
test_run_basic
()
{
let
(
tx
,
rx
)
=
channel
();
TaskBuilder
::
new
()
.spawn
(
proc
()
{
TaskBuilder
::
new
()
.spawn
(
move
||
{
tx
.send
(());
});
rx
.recv
();
...
...
@@ -318,10 +334,10 @@ fn test_run_basic() {
#[test]
fn
test_try_future
()
{
let
result
=
TaskBuilder
::
new
()
.try_future
(
proc
()
{});
let
result
=
TaskBuilder
::
new
()
.try_future
(
move
||
{});
assert
!
(
result
.unwrap
()
.is_ok
());
let
result
=
TaskBuilder
::
new
()
.try_future
(
proc
()
->
()
{
let
result
=
TaskBuilder
::
new
()
.try_future
(
move
||
->
()
{
panic!
();
});
assert
!
(
result
.unwrap
()
.is_err
());
...
...
@@ -329,7 +345,7 @@ fn test_try_future() {
#[test]
fn
test_try_success
()
{
match
try
(
proc
()
{
match
try
(
move
||
{
"Success!"
.to_string
()
})
.as_ref
()
.map
(|
s
|
s
.as_slice
())
{
result
::
Result
::
Ok
(
"Success!"
)
=>
(),
...
...
@@ -339,7 +355,7 @@ fn test_try_success() {
#[test]
fn
test_try_panic
()
{
match
try
(
proc
()
{
match
try
(
move
||
{
panic!
()
})
{
result
::
Result
::
Err
(
_
)
=>
(),
...
...
@@ -355,7 +371,7 @@ fn test_spawn_sched() {
fn
f
(
i
:
int
,
tx
:
Sender
<
()
>
)
{
let
tx
=
tx
.clone
();
spawn
(
proc
()
{
spawn
(
move
||
{
if
i
==
0
{
tx
.send
(());
}
else
{
...
...
@@ -372,8 +388,8 @@ fn f(i: int, tx: Sender<()>) {
fn
test_spawn_sched_childs_on_default_sched
()
{
let
(
tx
,
rx
)
=
channel
();
spawn
(
proc
()
{
spawn
(
proc
()
{
spawn
(
move
||
{
spawn
(
move
||
{
tx
.send
(());
});
});
...
...
@@ -382,17 +398,17 @@ fn test_spawn_sched_childs_on_default_sched() {
}
fn
avoid_copying_the_body
<
F
>
(
spawnfn
:
F
)
where
F
:
FnOnce
(
proc
():
Send
),
F
:
FnOnce
(
Thunk
),
{
let
(
tx
,
rx
)
=
channel
::
<
uint
>
();
let
x
=
box
1
;
let
x_in_parent
=
(
&*
x
)
as
*
const
int
as
uint
;
spawnfn
(
proc
()
{
spawnfn
(
Thunk
::
new
(
move
||
{
let
x_in_child
=
(
&*
x
)
as
*
const
int
as
uint
;
tx
.send
(
x_in_child
);
});
})
)
;
let
x_in_child
=
rx
.recv
();
assert_eq!
(
x_in_parent
,
x_in_child
);
...
...
@@ -400,25 +416,21 @@ fn avoid_copying_the_body<F>(spawnfn: F) where
#[test]
fn
test_avoid_copying_the_body_spawn
()
{
avoid_copying_the_body
(
spawn
);
avoid_copying_the_body
(
|
t
|
spawn
(
move
||
t
.invoke
(()))
);
}
#[test]
fn
test_avoid_copying_the_body_task_spawn
()
{
avoid_copying_the_body
(|
f
|
{
let
builder
=
TaskBuilder
::
new
();
builder
.spawn
(
proc
()
{
f
();
});
builder
.spawn
(
move
||
f
.invoke
(()));
})
}
#[test]
fn
test_avoid_copying_the_body_try
()
{
avoid_copying_the_body
(|
f
|
{
let
_
=
try
(
proc
()
{
f
()
});
let
_
=
try
(
move
||
f
.invoke
(()));
})
}
...
...
@@ -429,24 +441,24 @@ fn test_child_doesnt_ref_parent() {
// (well, it would if the constant were 8000+ - I lowered it to be more
// valgrind-friendly. try this at home, instead..!)
static
GENERATIONS
:
uint
=
16
;
fn
child_no
(
x
:
uint
)
->
proc
():
Send
{
return
proc
()
{
fn
child_no
(
x
:
uint
)
->
Thunk
{
return
Thunk
::
new
(
move
||
{
if
x
<
GENERATIONS
{
TaskBuilder
::
new
()
.spawn
(
child_no
(
x
+
1
));
TaskBuilder
::
new
()
.spawn
(
move
||
child_no
(
x
+
1
)
.invoke
(()
));
}
}
}
);
}
TaskBuilder
::
new
()
.spawn
(
child_no
(
0
));
TaskBuilder
::
new
()
.spawn
(
||
child_no
(
0
)
.invoke
(()
));
}
#[test]
fn
test_simple_newsched_spawn
()
{
spawn
(
proc
()
())
spawn
(
move
||
())
}
#[test]
fn
test_try_panic_message_static_str
()
{
match
try
(
proc
()
{
match
try
(
move
||
{
panic!
(
"static string"
);
})
{
Err
(
e
)
=>
{
...
...
@@ -460,7 +472,7 @@ fn test_try_panic_message_static_str() {
#[test]
fn
test_try_panic_message_owned_str
()
{
match
try
(
proc
()
{
match
try
(
move
||
{
panic!
(
"owned string"
.to_string
());
})
{
Err
(
e
)
=>
{
...
...
@@ -474,7 +486,7 @@ fn test_try_panic_message_owned_str() {
#[test]
fn
test_try_panic_message_any
()
{
match
try
(
proc
()
{
match
try
(
move
||
{
panic!
(
box
413u16
as
Box
<
Any
+
Send
>
);
})
{
Err
(
e
)
=>
{
...
...
@@ -492,7 +504,7 @@ fn test_try_panic_message_any() {
fn
test_try_panic_message_unit_struct
()
{
struct
Juju
;
match
try
(
proc
()
{
match
try
(
move
||
{
panic!
(
Juju
)
})
{
Err
(
ref
e
)
if
e
.is
::
<
Juju
>
()
=>
{}
...
...
@@ -507,7 +519,7 @@ fn test_stdout() {
let
stdout
=
ChanWriter
::
new
(
tx
);
let
r
=
TaskBuilder
::
new
()
.stdout
(
box
stdout
as
Box
<
Writer
+
Send
>
)
.try
(
proc
()
{
.try
(
move
||
{
print!
(
"Hello, world!"
);
});
assert
!
(
r
.is_ok
());
...
...
@@ -527,7 +539,7 @@ fn task_abort_no_kill_runtime() {
use
mem
;
let
tb
=
TaskBuilder
::
new
();
let
rx
=
tb
.try_future
(
proc
()
{});
let
rx
=
tb
.try_future
(
move
||
{});
mem
::
drop
(
rx
);
timer
::
sleep
(
Duration
::
milliseconds
(
1000
));
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录