Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
int
Rust
提交
786dea20
R
Rust
项目概览
int
/
Rust
11 个月 前同步成功
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rust
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
786dea20
编写于
12月 03, 2013
作者:
P
Patrick Walton
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
libextra: Another round of de-`Cell`-ing.
34 uses of `Cell` remain.
上级
5aad292f
变更
35
隐藏空白更改
内联
并排
Showing
35 changed file
with
211 addition
and
387 deletion
+211
-387
src/libextra/arc.rs
src/libextra/arc.rs
+3
-3
src/libextra/future.rs
src/libextra/future.rs
+3
-6
src/libextra/test.rs
src/libextra/test.rs
+1
-2
src/libextra/workcache.rs
src/libextra/workcache.rs
+0
-3
src/librustdoc/lib.rs
src/librustdoc/lib.rs
+5
-6
src/librustdoc/passes.rs
src/librustdoc/passes.rs
+3
-6
src/librustuv/async.rs
src/librustuv/async.rs
+2
-3
src/librustuv/net.rs
src/librustuv/net.rs
+28
-53
src/librustuv/pipe.rs
src/librustuv/pipe.rs
+2
-5
src/librustuv/signal.rs
src/librustuv/signal.rs
+1
-3
src/librustuv/timer.rs
src/librustuv/timer.rs
+6
-7
src/libstd/io/net/tcp.rs
src/libstd/io/net/tcp.rs
+30
-61
src/libstd/io/net/udp.rs
src/libstd/io/net/udp.rs
+8
-17
src/libstd/io/net/unix.rs
src/libstd/io/net/unix.rs
+7
-15
src/libstd/rt/comm.rs
src/libstd/rt/comm.rs
+17
-35
src/libstd/rt/kill.rs
src/libstd/rt/kill.rs
+4
-3
src/libstd/rt/mod.rs
src/libstd/rt/mod.rs
+17
-16
src/libstd/rt/sched.rs
src/libstd/rt/sched.rs
+21
-36
src/libstd/rt/test.rs
src/libstd/rt/test.rs
+10
-20
src/libstd/task/mod.rs
src/libstd/task/mod.rs
+4
-11
src/libstd/task/spawn.rs
src/libstd/task/spawn.rs
+5
-11
src/libstd/unstable/mod.rs
src/libstd/unstable/mod.rs
+1
-3
src/test/bench/msgsend-ring-mutex-arcs.rs
src/test/bench/msgsend-ring-mutex-arcs.rs
+1
-6
src/test/bench/msgsend-ring-rw-arcs.rs
src/test/bench/msgsend-ring-rw-arcs.rs
+1
-6
src/test/bench/rt-messaging-ping-pong.rs
src/test/bench/rt-messaging-ping-pong.rs
+8
-14
src/test/bench/rt-parfib.rs
src/test/bench/rt-parfib.rs
+1
-3
src/test/bench/shootout-chameneos-redux.rs
src/test/bench/shootout-chameneos-redux.rs
+5
-4
src/test/bench/task-perf-jargon-metal-smoke.rs
src/test/bench/task-perf-jargon-metal-smoke.rs
+0
-3
src/test/compile-fail/no-send-res-ports.rs
src/test/compile-fail/no-send-res-ports.rs
+2
-3
src/test/compile-fail/no_freeze-rc.rs
src/test/compile-fail/no_freeze-rc.rs
+3
-3
src/test/run-pass/issue-2718.rs
src/test/run-pass/issue-2718.rs
+0
-2
src/test/run-pass/sendfn-spawn-with-fn-arg.rs
src/test/run-pass/sendfn-spawn-with-fn-arg.rs
+1
-3
src/test/run-pass/task-killjoin-rsrc.rs
src/test/run-pass/task-killjoin-rsrc.rs
+1
-3
src/test/run-pass/tempfile.rs
src/test/run-pass/tempfile.rs
+3
-5
src/test/run-pass/trait-bounds-in-arc.rs
src/test/run-pass/trait-bounds-in-arc.rs
+7
-7
未找到文件。
src/libextra/arc.rs
浏览文件 @
786dea20
...
...
@@ -628,10 +628,10 @@ fn test_mutex_arc_condvar() {
let
arc
=
~
MutexArc
::
new
(
false
);
let
arc2
=
~
arc
.clone
();
let
(
p
,
c
)
=
comm
::
oneshot
();
let
(
c
,
p
)
=
(
Cell
::
new
(
c
),
Cell
::
new
(
p
)
);
do
task
::
spawn
||
{
let
c
=
Cell
::
new
(
c
);
do
task
::
spawn
{
// wait until parent gets in
p
.
take
()
.
recv
();
p
.recv
();
arc2
.access_cond
(|
state
,
cond
|
{
*
state
=
true
;
cond
.signal
();
...
...
src/libextra/future.rs
浏览文件 @
786dea20
...
...
@@ -25,7 +25,6 @@
#[allow(missing_doc)]
;
use
std
::
cell
::
Cell
;
use
std
::
comm
::{
PortOne
,
oneshot
};
use
std
::
util
::
replace
;
...
...
@@ -113,9 +112,8 @@ pub fn from_port(port: PortOne<A>) -> Future<A> {
* waiting for the result to be received on the port.
*/
let
port
=
Cell
::
new
(
port
);
do
Future
::
from_fn
{
port
.
take
()
.
recv
()
port
.recv
()
}
}
...
...
@@ -141,7 +139,6 @@ pub fn spawn(blk: proc() -> A) -> Future<A> {
mod
test
{
use
future
::
Future
;
use
std
::
cell
::
Cell
;
use
std
::
comm
::
oneshot
;
use
std
::
task
;
...
...
@@ -199,9 +196,9 @@ fn test_futurefail() {
#[test]
fn
test_sendable_future
()
{
let
expected
=
"schlorf"
;
let
f
=
Cell
::
new
(
do
Future
::
spawn
{
expected
})
;
let
f
=
do
Future
::
spawn
{
expected
}
;
do
task
::
spawn
{
let
mut
f
=
f
.take
()
;
let
mut
f
=
f
;
let
actual
=
f
.get
();
assert_eq!
(
actual
,
expected
);
}
...
...
src/libextra/test.rs
浏览文件 @
786dea20
...
...
@@ -872,7 +872,6 @@ pub fn run_test(force_ignore: bool,
fn
run_test_inner
(
desc
:
TestDesc
,
monitor_ch
:
SharedChan
<
MonitorMsg
>
,
testfn
:
proc
())
{
let
testfn_cell
=
::
std
::
cell
::
Cell
::
new
(
testfn
);
do
task
::
spawn
{
let
mut
task
=
task
::
task
();
task
.name
(
match
desc
.name
{
...
...
@@ -880,7 +879,7 @@ fn run_test_inner(desc: TestDesc,
StaticTestName
(
name
)
=>
SendStrStatic
(
name
),
});
let
result_future
=
task
.future_result
();
task
.spawn
(
testfn
_cell
.take
()
);
task
.spawn
(
testfn
);
let
task_result
=
result_future
.recv
();
let
test_result
=
calc_result
(
&
desc
,
task_result
.is_ok
());
...
...
src/libextra/workcache.rs
浏览文件 @
786dea20
...
...
@@ -15,7 +15,6 @@
use
serialize
::{
Encoder
,
Encodable
,
Decoder
,
Decodable
};
use
arc
::{
Arc
,
RWArc
};
use
treemap
::
TreeMap
;
use
std
::
cell
::
Cell
;
use
std
::
comm
::{
PortOne
,
oneshot
};
use
std
::{
str
,
task
};
use
std
::
io
;
...
...
@@ -430,7 +429,6 @@ fn exec_work<'self, T:Send +
debug!
(
"Cache miss!"
);
let
(
port
,
chan
)
=
oneshot
();
let
blk
=
bo
.take_unwrap
();
let
chan
=
Cell
::
new
(
chan
);
// XXX: What happens if the task fails?
do
task
::
spawn
{
...
...
@@ -438,7 +436,6 @@ fn exec_work<'self, T:Send +
discovered_inputs
:
WorkMap
::
new
(),
discovered_outputs
:
WorkMap
::
new
(),
};
let
chan
=
chan
.take
();
let
v
=
blk
(
&
mut
exe
);
chan
.send
((
exe
,
v
));
}
...
...
src/librustdoc/lib.rs
浏览文件 @
786dea20
...
...
@@ -24,7 +24,6 @@
extern
mod
rustc
;
extern
mod
extra
;
use
std
::
cell
::
Cell
;
use
std
::
local_data
;
use
std
::
io
;
use
std
::
io
::
File
;
...
...
@@ -194,13 +193,13 @@ fn rust_input(cratefile: &str, matches: &getopts::Matches) -> Output {
let
mut
plugins
=
matches
.opt_strs
(
"plugins"
);
// First, parse the crate and extract all relevant information.
let
libs
=
Cell
::
new
(
matches
.opt_strs
(
"L"
)
.map
(|
s
|
Path
::
new
(
s
.as_slice
()
)));
let
cfgs
=
Cell
::
new
(
matches
.opt_strs
(
"cfg"
)
);
let
cr
=
Cell
::
new
(
Path
::
new
(
cratefile
)
);
let
libs
=
matches
.opt_strs
(
"L"
)
.map
(|
s
|
Path
::
new
(
s
.as_slice
(
)));
let
cfgs
=
matches
.opt_strs
(
"cfg"
);
let
cr
=
Path
::
new
(
cratefile
);
info!
(
"starting to run rustc"
);
let
(
crate
,
analysis
)
=
do
std
::
task
::
try
{
let
cr
=
cr
.take
()
;
core
::
run_core
(
libs
.
take
()
.move_iter
()
.collect
(),
cfgs
.take
()
,
&
cr
)
let
cr
=
cr
;
core
::
run_core
(
libs
.
move_iter
()
.collect
(),
cfgs
,
&
cr
)
}
.unwrap
();
info!
(
"finished with rustc"
);
local_data
::
set
(
analysiskey
,
analysis
);
...
...
src/librustdoc/passes.rs
浏览文件 @
786dea20
...
...
@@ -8,12 +8,10 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use
std
::
num
;
use
std
::
cell
::
Cell
;
use
std
::
uint
;
use
std
::
hashmap
::
HashSet
;
use
std
::
local_data
;
use
std
::
num
;
use
std
::
uint
;
use
syntax
::
ast
;
use
clean
;
...
...
@@ -56,11 +54,10 @@ fn fold_item(&mut self, i: Item) -> Option<Item> {
pub
fn
strip_private
(
crate
:
clean
::
Crate
)
->
plugins
::
PluginResult
{
// This stripper collects all *retained* nodes.
let
mut
retained
=
HashSet
::
new
();
let
crate
=
Cell
::
new
(
crate
);
let
exported_items
=
local_data
::
get
(
super
::
analysiskey
,
|
analysis
|
{
analysis
.unwrap
()
.exported_items
.clone
()
});
let
mut
crate
=
crate
.take
()
;
let
mut
crate
=
crate
;
// strip all private items
{
...
...
src/librustuv/async.rs
浏览文件 @
786dea20
...
...
@@ -125,7 +125,6 @@ fn drop(&mut self) {
#[cfg(test)]
mod
test_remote
{
use
std
::
cell
::
Cell
;
use
std
::
rt
::
rtio
::
Callback
;
use
std
::
rt
::
thread
::
Thread
;
use
std
::
rt
::
tube
::
Tube
;
...
...
@@ -150,10 +149,10 @@ fn call(&mut self) {
let
mut
tube
=
Tube
::
new
();
let
cb
=
~
MyCallback
(
Some
(
tube
.clone
()));
let
watcher
=
Cell
::
new
(
AsyncWatcher
::
new
(
local_loop
(),
cb
as
~
Callback
)
);
let
watcher
=
AsyncWatcher
::
new
(
local_loop
(),
cb
as
~
Callback
);
let
thread
=
do
Thread
::
start
{
watcher
.
take
()
.
fire
();
watcher
.fire
();
};
assert_eq!
(
tube
.recv
(),
1
);
...
...
src/librustuv/net.rs
浏览文件 @
786dea20
...
...
@@ -691,7 +691,6 @@ fn udp_bind_close_ip6() {
#[test]
fn
listen_ip4
()
{
let
(
port
,
chan
)
=
oneshot
();
let
chan
=
Cell
::
new
(
chan
);
let
addr
=
next_test_ip4
();
do
spawn
{
...
...
@@ -701,7 +700,7 @@ fn listen_ip4() {
let
mut
w
=
match
w
.listen
()
{
Ok
(
w
)
=>
w
,
Err
(
e
)
=>
fail
!
(
"{:?}"
,
e
),
};
chan
.
take
()
.
send
(());
chan
.send
(());
match
w
.accept
()
{
Ok
(
mut
stream
)
=>
{
let
mut
buf
=
[
0u8
,
..
10
];
...
...
@@ -728,7 +727,6 @@ fn listen_ip4() {
#[test]
fn
listen_ip6
()
{
let
(
port
,
chan
)
=
oneshot
();
let
chan
=
Cell
::
new
(
chan
);
let
addr
=
next_test_ip6
();
do
spawn
{
...
...
@@ -738,7 +736,7 @@ fn listen_ip6() {
let
mut
w
=
match
w
.listen
()
{
Ok
(
w
)
=>
w
,
Err
(
e
)
=>
fail
!
(
"{:?}"
,
e
),
};
chan
.
take
()
.
send
(());
chan
.send
(());
match
w
.accept
()
{
Ok
(
mut
stream
)
=>
{
let
mut
buf
=
[
0u8
,
..
10
];
...
...
@@ -765,14 +763,13 @@ fn listen_ip6() {
#[test]
fn
udp_recv_ip4
()
{
let
(
port
,
chan
)
=
oneshot
();
let
chan
=
Cell
::
new
(
chan
);
let
client
=
next_test_ip4
();
let
server
=
next_test_ip4
();
do
spawn
{
match
UdpWatcher
::
bind
(
local_loop
(),
server
)
{
Ok
(
mut
w
)
=>
{
chan
.
take
()
.
send
(());
chan
.send
(());
let
mut
buf
=
[
0u8
,
..
10
];
match
w
.recvfrom
(
buf
)
{
Ok
((
10
,
addr
))
=>
assert_eq!
(
addr
,
client
),
...
...
@@ -798,14 +795,13 @@ fn udp_recv_ip4() {
#[test]
fn
udp_recv_ip6
()
{
let
(
port
,
chan
)
=
oneshot
();
let
chan
=
Cell
::
new
(
chan
);
let
client
=
next_test_ip6
();
let
server
=
next_test_ip6
();
do
spawn
{
match
UdpWatcher
::
bind
(
local_loop
(),
server
)
{
Ok
(
mut
w
)
=>
{
chan
.
take
()
.
send
(());
chan
.send
(());
let
mut
buf
=
[
0u8
,
..
10
];
match
w
.recvfrom
(
buf
)
{
Ok
((
10
,
addr
))
=>
assert_eq!
(
addr
,
client
),
...
...
@@ -834,13 +830,11 @@ fn test_read_read_read() {
let
addr
=
next_test_ip4
();
static
MAX
:
uint
=
5000
;
let
(
port
,
chan
)
=
oneshot
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
do
spawn
{
let
listener
=
TcpListener
::
bind
(
local_loop
(),
addr
)
.unwrap
();
let
mut
acceptor
=
listener
.listen
()
.unwrap
();
chan
.
take
()
.
send
(());
chan
.send
(());
let
mut
stream
=
acceptor
.accept
()
.unwrap
();
let
buf
=
[
1
,
..
2048
];
let
mut
total_bytes_written
=
0
;
...
...
@@ -852,7 +846,7 @@ fn test_read_read_read() {
}
do
spawn
{
port
.
take
()
.
recv
();
port
.recv
();
let
mut
stream
=
TcpWatcher
::
connect
(
local_loop
(),
addr
)
.unwrap
();
let
mut
buf
=
[
0
,
..
2048
];
let
mut
total_bytes_read
=
0
;
...
...
@@ -873,18 +867,16 @@ fn test_udp_twice() {
let
server_addr
=
next_test_ip4
();
let
client_addr
=
next_test_ip4
();
let
(
port
,
chan
)
=
oneshot
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
do
spawn
{
let
mut
client
=
UdpWatcher
::
bind
(
local_loop
(),
client_addr
)
.unwrap
();
port
.
take
()
.
recv
();
port
.recv
();
assert
!
(
client
.sendto
([
1
],
server_addr
)
.is_ok
());
assert
!
(
client
.sendto
([
2
],
server_addr
)
.is_ok
());
}
let
mut
server
=
UdpWatcher
::
bind
(
local_loop
(),
server_addr
)
.unwrap
();
chan
.
take
()
.
send
(());
chan
.send
(());
let
mut
buf1
=
[
0
];
let
mut
buf2
=
[
0
];
let
(
nread1
,
src1
)
=
server
.recvfrom
(
buf1
)
.unwrap
();
...
...
@@ -908,14 +900,11 @@ fn test_udp_many_read() {
let
(
p1
,
c1
)
=
oneshot
();
let
(
p2
,
c2
)
=
oneshot
();
let
first
=
Cell
::
new
((
p1
,
c2
));
let
second
=
Cell
::
new
((
p2
,
c1
));
do
spawn
{
let
l
=
local_loop
();
let
mut
server_out
=
UdpWatcher
::
bind
(
l
,
server_out_addr
)
.unwrap
();
let
mut
server_in
=
UdpWatcher
::
bind
(
l
,
server_in_addr
)
.unwrap
();
let
(
port
,
chan
)
=
first
.take
(
);
let
(
port
,
chan
)
=
(
p1
,
c2
);
chan
.send
(());
port
.recv
();
let
msg
=
[
1
,
..
2048
];
...
...
@@ -939,7 +928,7 @@ fn test_udp_many_read() {
let
l
=
local_loop
();
let
mut
client_out
=
UdpWatcher
::
bind
(
l
,
client_out_addr
)
.unwrap
();
let
mut
client_in
=
UdpWatcher
::
bind
(
l
,
client_in_addr
)
.unwrap
();
let
(
port
,
chan
)
=
second
.take
(
);
let
(
port
,
chan
)
=
(
p2
,
c1
);
port
.recv
();
chan
.send
(());
let
mut
total_bytes_recv
=
0
;
...
...
@@ -966,14 +955,12 @@ fn test_udp_many_read() {
fn
test_read_and_block
()
{
let
addr
=
next_test_ip4
();
let
(
port
,
chan
)
=
oneshot
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
do
spawn
{
let
listener
=
TcpListener
::
bind
(
local_loop
(),
addr
)
.unwrap
();
let
mut
acceptor
=
listener
.listen
()
.unwrap
();
let
(
port2
,
chan2
)
=
stream
();
chan
.
take
()
.
send
(
port2
);
chan
.send
(
port2
);
let
mut
stream
=
acceptor
.accept
()
.unwrap
();
let
mut
buf
=
[
0
,
..
2048
];
...
...
@@ -998,7 +985,7 @@ fn test_read_and_block() {
}
do
spawn
{
let
port2
=
port
.
take
()
.
recv
();
let
port2
=
port
.recv
();
let
mut
stream
=
TcpWatcher
::
connect
(
local_loop
(),
addr
)
.unwrap
();
stream
.write
([
0
,
1
,
2
,
3
,
4
,
5
,
6
,
7
]);
stream
.write
([
0
,
1
,
2
,
3
,
4
,
5
,
6
,
7
]);
...
...
@@ -1041,18 +1028,14 @@ fn test_simple_tcp_server_and_client_on_diff_threads() {
#[test]
fn
test_homing_closes_correctly
()
{
let
(
port
,
chan
)
=
oneshot
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
do
task
::
spawn_sched
(
task
::
SingleThreaded
)
{
let
chan
=
Cell
::
new
(
chan
.take
());
let
listener
=
UdpWatcher
::
bind
(
local_loop
(),
next_test_ip4
())
.unwrap
();
chan
.
take
()
.
send
(
listener
);
chan
.send
(
listener
);
}
do
task
::
spawn_sched
(
task
::
SingleThreaded
)
{
let
port
=
Cell
::
new
(
port
.take
());
port
.take
()
.recv
();
port
.recv
();
}
}
...
...
@@ -1086,13 +1069,13 @@ fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
let
mut
sched2
=
~
Scheduler
::
new
(
loop2
,
worker2
,
queues
.clone
(),
sleepers
.clone
());
let
handle1
=
Cell
::
new
(
sched1
.make_handle
()
);
let
handle2
=
Cell
::
new
(
sched2
.make_handle
()
);
let
handle1
=
sched1
.make_handle
(
);
let
handle2
=
sched2
.make_handle
(
);
let
tasksFriendHandle
=
Cell
::
new
(
sched2
.make_handle
());
let
on_exit
:
proc
(
UnwindResult
)
=
proc
(
exit_status
)
{
handle1
.
take
()
.
send
(
Shutdown
);
handle2
.
take
()
.
send
(
Shutdown
);
handle1
.send
(
Shutdown
);
handle2
.send
(
Shutdown
);
assert
!
(
exit_status
.is_success
());
};
...
...
@@ -1133,19 +1116,16 @@ unsafe fn local_io() -> &'static mut IoFactory {
let
mut
main_task
=
~
Task
::
new_root
(
&
mut
sched1
.stack_pool
,
None
,
test_function
);
main_task
.death.on_exit
=
Some
(
on_exit
);
let
main_task
=
Cell
::
new
(
main_task
);
let
null_task
=
Cell
::
new
(
~
do
Task
::
new_root
(
&
mut
sched2
.stack_pool
,
None
)
||
{});
let
sched1
=
Cell
::
new
(
sched1
);
let
sched2
=
Cell
::
new
(
sched2
);
let
null_task
=
~
do
Task
::
new_root
(
&
mut
sched2
.stack_pool
,
None
)
{
// nothing
};
let
thread1
=
do
Thread
::
start
{
sched1
.
take
()
.bootstrap
(
main_task
.take
()
);
sched1
.
bootstrap
(
main_task
);
};
let
thread2
=
do
Thread
::
start
{
sched2
.
take
()
.bootstrap
(
null_task
.take
()
);
sched2
.
bootstrap
(
null_task
);
};
thread1
.join
();
...
...
@@ -1164,13 +1144,12 @@ fn tcp_listener_fail_cleanup() {
#[should_fail]
#[test]
fn
tcp_stream_fail_cleanup
()
{
let
(
port
,
chan
)
=
oneshot
();
let
chan
=
Cell
::
new
(
chan
);
let
addr
=
next_test_ip4
();
do
spawn
{
let
w
=
TcpListener
::
bind
(
local_loop
(),
addr
)
.unwrap
();
let
mut
w
=
w
.listen
()
.unwrap
();
chan
.
take
()
.
send
(());
chan
.send
(());
w
.accept
();
}
port
.recv
();
...
...
@@ -1189,14 +1168,13 @@ fn udp_listener_fail_cleanup() {
fn
udp_fail_other_task
()
{
let
addr
=
next_test_ip4
();
let
(
port
,
chan
)
=
oneshot
();
let
chan
=
Cell
::
new
(
chan
);
// force the handle to be created on a different scheduler, failure in
// the original task will force a homing operation back to this
// scheduler.
do
task
::
spawn_sched
(
task
::
SingleThreaded
)
{
let
w
=
UdpWatcher
::
bind
(
local_loop
(),
addr
)
.unwrap
();
chan
.
take
()
.
send
(
w
);
chan
.send
(
w
);
}
let
_
w
=
port
.recv
();
...
...
@@ -1208,13 +1186,12 @@ fn udp_fail_other_task() {
#[ignore(reason
=
"linked failure"
)]
fn
linked_failure1
()
{
let
(
port
,
chan
)
=
oneshot
();
let
chan
=
Cell
::
new
(
chan
);
let
addr
=
next_test_ip4
();
do
spawn
{
let
w
=
TcpListener
::
bind
(
local_loop
(),
addr
)
.unwrap
();
let
mut
w
=
w
.listen
()
.unwrap
();
chan
.
take
()
.
send
(());
chan
.send
(());
w
.accept
();
}
...
...
@@ -1227,13 +1204,12 @@ fn linked_failure1() {
#[ignore(reason
=
"linked failure"
)]
fn
linked_failure2
()
{
let
(
port
,
chan
)
=
oneshot
();
let
chan
=
Cell
::
new
(
chan
);
let
addr
=
next_test_ip4
();
do
spawn
{
let
w
=
TcpListener
::
bind
(
local_loop
(),
addr
)
.unwrap
();
let
mut
w
=
w
.listen
()
.unwrap
();
chan
.
take
()
.
send
(());
chan
.send
(());
let
mut
buf
=
[
0
];
w
.accept
()
.unwrap
()
.read
(
buf
);
}
...
...
@@ -1249,11 +1225,10 @@ fn linked_failure2() {
#[ignore(reason
=
"linked failure"
)]
fn
linked_failure3
()
{
let
(
port
,
chan
)
=
stream
();
let
chan
=
Cell
::
new
(
chan
);
let
addr
=
next_test_ip4
();
do
spawn
{
let
chan
=
chan
.take
()
;
let
chan
=
chan
;
let
w
=
TcpListener
::
bind
(
local_loop
(),
addr
)
.unwrap
();
let
mut
w
=
w
.listen
()
.unwrap
();
chan
.send
(());
...
...
src/librustuv/pipe.rs
浏览文件 @
786dea20
...
...
@@ -231,7 +231,6 @@ fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
#[cfg(test)]
mod
tests
{
use
std
::
cell
::
Cell
;
use
std
::
comm
::
oneshot
;
use
std
::
rt
::
rtio
::{
RtioUnixListener
,
RtioUnixAcceptor
,
RtioPipe
};
use
std
::
rt
::
test
::
next_test_unix
;
...
...
@@ -276,12 +275,11 @@ fn connect() {
let
path
=
next_test_unix
();
let
path2
=
path
.clone
();
let
(
port
,
chan
)
=
oneshot
();
let
chan
=
Cell
::
new
(
chan
);
do
spawn
{
let
p
=
PipeListener
::
bind
(
local_loop
(),
&
path2
.to_c_str
())
.unwrap
();
let
mut
p
=
p
.listen
()
.unwrap
();
chan
.
take
()
.
send
(());
chan
.send
(());
let
mut
client
=
p
.accept
()
.unwrap
();
let
mut
buf
=
[
0
];
assert
!
(
client
.read
(
buf
)
.unwrap
()
==
1
);
...
...
@@ -301,12 +299,11 @@ fn connect_fail() {
let
path
=
next_test_unix
();
let
path2
=
path
.clone
();
let
(
port
,
chan
)
=
oneshot
();
let
chan
=
Cell
::
new
(
chan
);
do
spawn
{
let
p
=
PipeListener
::
bind
(
local_loop
(),
&
path2
.to_c_str
())
.unwrap
();
let
mut
p
=
p
.listen
()
.unwrap
();
chan
.
take
()
.
send
(());
chan
.send
(());
p
.accept
();
}
port
.recv
();
...
...
src/librustuv/signal.rs
浏览文件 @
786dea20
...
...
@@ -76,7 +76,6 @@ fn drop(&mut self) {
#[cfg(test)]
mod
test
{
use
super
::
*
;
use
std
::
cell
::
Cell
;
use
super
::
super
::
local_loop
;
use
std
::
io
::
signal
;
use
std
::
comm
::{
SharedChan
,
stream
};
...
...
@@ -89,9 +88,8 @@ fn closing_channel_during_drop_doesnt_kill_everything() {
let
_
signal
=
SignalWatcher
::
new
(
local_loop
(),
signal
::
Interrupt
,
chan
);
let
port
=
Cell
::
new
(
port
);
do
spawn
{
port
.t
ake
()
.t
ry_recv
();
port
.try_recv
();
}
// when we drop the SignalWatcher we're going to destroy the channel,
...
...
src/librustuv/timer.rs
浏览文件 @
786dea20
...
...
@@ -163,7 +163,6 @@ fn drop(&mut self) {
#[cfg(test)]
mod
test
{
use
super
::
*
;
use
std
::
cell
::
Cell
;
use
std
::
rt
::
rtio
::
RtioTimer
;
use
super
::
super
::
local_loop
;
...
...
@@ -229,10 +228,10 @@ fn normal_fail() {
fn
closing_channel_during_drop_doesnt_kill_everything
()
{
// see issue #10375
let
mut
timer
=
TimerWatcher
::
new
(
local_loop
());
let
timer_port
=
Cell
::
new
(
timer
.period
(
1000
)
);
let
timer_port
=
timer
.period
(
1000
);
do
spawn
{
timer_port
.t
ake
()
.t
ry_recv
();
timer_port
.try_recv
();
}
// when we drop the TimerWatcher we're going to destroy the channel,
...
...
@@ -243,10 +242,10 @@ fn closing_channel_during_drop_doesnt_kill_everything() {
fn
reset_doesnt_switch_tasks
()
{
// similar test to the one above.
let
mut
timer
=
TimerWatcher
::
new
(
local_loop
());
let
timer_port
=
Cell
::
new
(
timer
.period
(
1000
)
);
let
timer_port
=
timer
.period
(
1000
);
do
spawn
{
timer_port
.t
ake
()
.t
ry_recv
();
timer_port
.try_recv
();
}
timer
.oneshot
(
1
);
...
...
@@ -255,10 +254,10 @@ fn reset_doesnt_switch_tasks() {
fn
reset_doesnt_switch_tasks2
()
{
// similar test to the one above.
let
mut
timer
=
TimerWatcher
::
new
(
local_loop
());
let
timer_port
=
Cell
::
new
(
timer
.period
(
1000
)
);
let
timer_port
=
timer
.period
(
1000
);
do
spawn
{
timer_port
.t
ake
()
.t
ry_recv
();
timer_port
.try_recv
();
}
timer
.sleep
(
1
);
...
...
src/libstd/io/net/tcp.rs
浏览文件 @
786dea20
...
...
@@ -146,7 +146,6 @@ fn accept(&mut self) -> Option<TcpStream> {
#[cfg(test)]
mod
test
{
use
super
::
*
;
use
cell
::
Cell
;
use
rt
::
test
::
*
;
use
io
::
net
::
ip
::{
Ipv4Addr
,
SocketAddr
};
use
io
::
*
;
...
...
@@ -196,12 +195,10 @@ fn smoke_test_ip4() {
do
run_in_mt_newsched_task
{
let
addr
=
next_test_ip4
();
let
(
port
,
chan
)
=
oneshot
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
do
spawntask
{
let
mut
acceptor
=
TcpListener
::
bind
(
addr
)
.listen
();
chan
.
take
()
.
send
(());
chan
.send
(());
let
mut
stream
=
acceptor
.accept
();
let
mut
buf
=
[
0
];
stream
.read
(
buf
);
...
...
@@ -209,7 +206,7 @@ fn smoke_test_ip4() {
}
do
spawntask
{
port
.
take
()
.
recv
();
port
.recv
();
let
mut
stream
=
TcpStream
::
connect
(
addr
);
stream
.write
([
99
]);
}
...
...
@@ -221,12 +218,10 @@ fn smoke_test_ip6() {
do
run_in_mt_newsched_task
{
let
addr
=
next_test_ip6
();
let
(
port
,
chan
)
=
oneshot
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
do
spawntask
{
let
mut
acceptor
=
TcpListener
::
bind
(
addr
)
.listen
();
chan
.
take
()
.
send
(());
chan
.send
(());
let
mut
stream
=
acceptor
.accept
();
let
mut
buf
=
[
0
];
stream
.read
(
buf
);
...
...
@@ -234,7 +229,7 @@ fn smoke_test_ip6() {
}
do
spawntask
{
port
.
take
()
.
recv
();
port
.recv
();
let
mut
stream
=
TcpStream
::
connect
(
addr
);
stream
.write
([
99
]);
}
...
...
@@ -246,12 +241,10 @@ fn read_eof_ip4() {
do
run_in_mt_newsched_task
{
let
addr
=
next_test_ip4
();
let
(
port
,
chan
)
=
oneshot
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
do
spawntask
{
let
mut
acceptor
=
TcpListener
::
bind
(
addr
)
.listen
();
chan
.
take
()
.
send
(());
chan
.send
(());
let
mut
stream
=
acceptor
.accept
();
let
mut
buf
=
[
0
];
let
nread
=
stream
.read
(
buf
);
...
...
@@ -259,7 +252,7 @@ fn read_eof_ip4() {
}
do
spawntask
{
port
.
take
()
.
recv
();
port
.recv
();
let
_
stream
=
TcpStream
::
connect
(
addr
);
// Close
}
...
...
@@ -271,12 +264,10 @@ fn read_eof_ip6() {
do
run_in_mt_newsched_task
{
let
addr
=
next_test_ip6
();
let
(
port
,
chan
)
=
oneshot
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
do
spawntask
{
let
mut
acceptor
=
TcpListener
::
bind
(
addr
)
.listen
();
chan
.
take
()
.
send
(());
chan
.send
(());
let
mut
stream
=
acceptor
.accept
();
let
mut
buf
=
[
0
];
let
nread
=
stream
.read
(
buf
);
...
...
@@ -284,7 +275,7 @@ fn read_eof_ip6() {
}
do
spawntask
{
port
.
take
()
.
recv
();
port
.recv
();
let
_
stream
=
TcpStream
::
connect
(
addr
);
// Close
}
...
...
@@ -296,12 +287,10 @@ fn read_eof_twice_ip4() {
do
run_in_mt_newsched_task
{
let
addr
=
next_test_ip4
();
let
(
port
,
chan
)
=
oneshot
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
do
spawntask
{
let
mut
acceptor
=
TcpListener
::
bind
(
addr
)
.listen
();
chan
.
take
()
.
send
(());
chan
.send
(());
let
mut
stream
=
acceptor
.accept
();
let
mut
buf
=
[
0
];
let
nread
=
stream
.read
(
buf
);
...
...
@@ -319,7 +308,7 @@ fn read_eof_twice_ip4() {
}
do
spawntask
{
port
.
take
()
.
recv
();
port
.recv
();
let
_
stream
=
TcpStream
::
connect
(
addr
);
// Close
}
...
...
@@ -331,12 +320,10 @@ fn read_eof_twice_ip6() {
do
run_in_mt_newsched_task
{
let
addr
=
next_test_ip6
();
let
(
port
,
chan
)
=
oneshot
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
do
spawntask
{
let
mut
acceptor
=
TcpListener
::
bind
(
addr
)
.listen
();
chan
.
take
()
.
send
(());
chan
.send
(());
let
mut
stream
=
acceptor
.accept
();
let
mut
buf
=
[
0
];
let
nread
=
stream
.read
(
buf
);
...
...
@@ -354,7 +341,7 @@ fn read_eof_twice_ip6() {
}
do
spawntask
{
port
.
take
()
.
recv
();
port
.recv
();
let
_
stream
=
TcpStream
::
connect
(
addr
);
// Close
}
...
...
@@ -366,12 +353,10 @@ fn write_close_ip4() {
do
run_in_mt_newsched_task
{
let
addr
=
next_test_ip4
();
let
(
port
,
chan
)
=
oneshot
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
do
spawntask
{
let
mut
acceptor
=
TcpListener
::
bind
(
addr
)
.listen
();
chan
.
take
()
.
send
(());
chan
.send
(());
let
mut
stream
=
acceptor
.accept
();
let
buf
=
[
0
];
loop
{
...
...
@@ -392,7 +377,7 @@ fn write_close_ip4() {
}
do
spawntask
{
port
.
take
()
.
recv
();
port
.recv
();
let
_
stream
=
TcpStream
::
connect
(
addr
);
// Close
}
...
...
@@ -404,12 +389,10 @@ fn write_close_ip6() {
do
run_in_mt_newsched_task
{
let
addr
=
next_test_ip6
();
let
(
port
,
chan
)
=
oneshot
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
do
spawntask
{
let
mut
acceptor
=
TcpListener
::
bind
(
addr
)
.listen
();
chan
.
take
()
.
send
(());
chan
.send
(());
let
mut
stream
=
acceptor
.accept
();
let
buf
=
[
0
];
loop
{
...
...
@@ -430,7 +413,7 @@ fn write_close_ip6() {
}
do
spawntask
{
port
.
take
()
.
recv
();
port
.recv
();
let
_
stream
=
TcpStream
::
connect
(
addr
);
// Close
}
...
...
@@ -443,12 +426,10 @@ fn multiple_connect_serial_ip4() {
let
addr
=
next_test_ip4
();
let
max
=
10
;
let
(
port
,
chan
)
=
oneshot
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
do
spawntask
{
let
mut
acceptor
=
TcpListener
::
bind
(
addr
)
.listen
();
chan
.
take
()
.
send
(());
chan
.send
(());
for
ref
mut
stream
in
acceptor
.incoming
()
.take
(
max
)
{
let
mut
buf
=
[
0
];
stream
.read
(
buf
);
...
...
@@ -457,7 +438,7 @@ fn multiple_connect_serial_ip4() {
}
do
spawntask
{
port
.
take
()
.
recv
();
port
.recv
();
max
.times
(||
{
let
mut
stream
=
TcpStream
::
connect
(
addr
);
stream
.write
([
99
]);
...
...
@@ -472,12 +453,10 @@ fn multiple_connect_serial_ip6() {
let
addr
=
next_test_ip6
();
let
max
=
10
;
let
(
port
,
chan
)
=
oneshot
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
do
spawntask
{
let
mut
acceptor
=
TcpListener
::
bind
(
addr
)
.listen
();
chan
.
take
()
.
send
(());
chan
.send
(());
for
ref
mut
stream
in
acceptor
.incoming
()
.take
(
max
)
{
let
mut
buf
=
[
0
];
stream
.read
(
buf
);
...
...
@@ -486,7 +465,7 @@ fn multiple_connect_serial_ip6() {
}
do
spawntask
{
port
.
take
()
.
recv
();
port
.recv
();
max
.times
(||
{
let
mut
stream
=
TcpStream
::
connect
(
addr
);
stream
.write
([
99
]);
...
...
@@ -501,16 +480,14 @@ fn multiple_connect_interleaved_greedy_schedule_ip4() {
let
addr
=
next_test_ip4
();
static
MAX
:
int
=
10
;
let
(
port
,
chan
)
=
oneshot
();
let
chan
=
Cell
::
new
(
chan
);
do
spawntask
{
let
mut
acceptor
=
TcpListener
::
bind
(
addr
)
.listen
();
chan
.
take
()
.
send
(());
chan
.send
(());
for
(
i
,
stream
)
in
acceptor
.incoming
()
.enumerate
()
.take
(
MAX
as
uint
)
{
let
stream
=
Cell
::
new
(
stream
);
// Start another task to handle the connection
do
spawntask
{
let
mut
stream
=
stream
.take
()
;
let
mut
stream
=
stream
;
let
mut
buf
=
[
0
];
stream
.read
(
buf
);
assert
!
(
buf
[
0
]
==
i
as
u8
);
...
...
@@ -543,16 +520,14 @@ fn multiple_connect_interleaved_greedy_schedule_ip6() {
let
addr
=
next_test_ip6
();
static
MAX
:
int
=
10
;
let
(
port
,
chan
)
=
oneshot
();
let
chan
=
Cell
::
new
(
chan
);
do
spawntask
{
let
mut
acceptor
=
TcpListener
::
bind
(
addr
)
.listen
();
chan
.
take
()
.
send
(());
chan
.send
(());
for
(
i
,
stream
)
in
acceptor
.incoming
()
.enumerate
()
.take
(
MAX
as
uint
)
{
let
stream
=
Cell
::
new
(
stream
);
// Start another task to handle the connection
do
spawntask
{
let
mut
stream
=
stream
.take
()
;
let
mut
stream
=
stream
;
let
mut
buf
=
[
0
];
stream
.read
(
buf
);
assert
!
(
buf
[
0
]
==
i
as
u8
);
...
...
@@ -585,16 +560,14 @@ fn multiple_connect_interleaved_lazy_schedule_ip4() {
let
addr
=
next_test_ip4
();
static
MAX
:
int
=
10
;
let
(
port
,
chan
)
=
oneshot
();
let
chan
=
Cell
::
new
(
chan
);
do
spawntask
{
let
mut
acceptor
=
TcpListener
::
bind
(
addr
)
.listen
();
chan
.
take
()
.
send
(());
chan
.send
(());
for
stream
in
acceptor
.incoming
()
.take
(
MAX
as
uint
)
{
let
stream
=
Cell
::
new
(
stream
);
// Start another task to handle the connection
do
spawntask_later
{
let
mut
stream
=
stream
.take
()
;
let
mut
stream
=
stream
;
let
mut
buf
=
[
0
];
stream
.read
(
buf
);
assert
!
(
buf
[
0
]
==
99
);
...
...
@@ -626,16 +599,14 @@ fn multiple_connect_interleaved_lazy_schedule_ip6() {
let
addr
=
next_test_ip6
();
static
MAX
:
int
=
10
;
let
(
port
,
chan
)
=
oneshot
();
let
chan
=
Cell
::
new
(
chan
);
do
spawntask
{
let
mut
acceptor
=
TcpListener
::
bind
(
addr
)
.listen
();
chan
.
take
()
.
send
(());
chan
.send
(());
for
stream
in
acceptor
.incoming
()
.take
(
MAX
as
uint
)
{
let
stream
=
Cell
::
new
(
stream
);
// Start another task to handle the connection
do
spawntask_later
{
let
mut
stream
=
stream
.take
()
;
let
mut
stream
=
stream
;
let
mut
buf
=
[
0
];
stream
.read
(
buf
);
assert
!
(
buf
[
0
]
==
99
);
...
...
@@ -682,18 +653,16 @@ fn socket_name(addr: SocketAddr) {
fn
peer_name
(
addr
:
SocketAddr
)
{
do
run_in_mt_newsched_task
{
let
(
port
,
chan
)
=
oneshot
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
do
spawntask
{
let
mut
acceptor
=
TcpListener
::
bind
(
addr
)
.listen
();
chan
.
take
()
.
send
(());
chan
.send
(());
acceptor
.accept
();
}
do
spawntask
{
port
.
take
()
.
recv
();
port
.recv
();
let
stream
=
TcpStream
::
connect
(
addr
);
assert
!
(
stream
.is_some
());
...
...
src/libstd/io/net/udp.rs
浏览文件 @
786dea20
...
...
@@ -110,7 +110,6 @@ mod test {
use
io
::
*
;
use
option
::{
Some
,
None
};
use
rt
::
comm
::
oneshot
;
use
cell
::
Cell
;
#[test]
#[ignore]
fn
bind_error
()
{
...
...
@@ -134,13 +133,11 @@ fn socket_smoke_test_ip4() {
let
server_ip
=
next_test_ip4
();
let
client_ip
=
next_test_ip4
();
let
(
port
,
chan
)
=
oneshot
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
do
spawntask
{
match
UdpSocket
::
bind
(
server_ip
)
{
Some
(
ref
mut
server
)
=>
{
chan
.
take
()
.
send
(());
chan
.send
(());
let
mut
buf
=
[
0
];
match
server
.recvfrom
(
buf
)
{
Some
((
nread
,
src
))
=>
{
...
...
@@ -158,7 +155,7 @@ fn socket_smoke_test_ip4() {
do
spawntask
{
match
UdpSocket
::
bind
(
client_ip
)
{
Some
(
ref
mut
client
)
=>
{
port
.
take
()
.
recv
();
port
.recv
();
client
.sendto
([
99
],
server_ip
)
}
None
=>
fail
!
()
...
...
@@ -173,13 +170,11 @@ fn socket_smoke_test_ip6() {
let
server_ip
=
next_test_ip6
();
let
client_ip
=
next_test_ip6
();
let
(
port
,
chan
)
=
oneshot
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
do
spawntask
{
match
UdpSocket
::
bind
(
server_ip
)
{
Some
(
ref
mut
server
)
=>
{
chan
.
take
()
.
send
(());
chan
.send
(());
let
mut
buf
=
[
0
];
match
server
.recvfrom
(
buf
)
{
Some
((
nread
,
src
))
=>
{
...
...
@@ -197,7 +192,7 @@ fn socket_smoke_test_ip6() {
do
spawntask
{
match
UdpSocket
::
bind
(
client_ip
)
{
Some
(
ref
mut
client
)
=>
{
port
.
take
()
.
recv
();
port
.recv
();
client
.sendto
([
99
],
server_ip
)
}
None
=>
fail
!
()
...
...
@@ -212,15 +207,13 @@ fn stream_smoke_test_ip4() {
let
server_ip
=
next_test_ip4
();
let
client_ip
=
next_test_ip4
();
let
(
port
,
chan
)
=
oneshot
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
do
spawntask
{
match
UdpSocket
::
bind
(
server_ip
)
{
Some
(
server
)
=>
{
let
server
=
~
server
;
let
mut
stream
=
server
.connect
(
client_ip
);
chan
.
take
()
.
send
(());
chan
.send
(());
let
mut
buf
=
[
0
];
match
stream
.read
(
buf
)
{
Some
(
nread
)
=>
{
...
...
@@ -239,7 +232,7 @@ fn stream_smoke_test_ip4() {
Some
(
client
)
=>
{
let
client
=
~
client
;
let
mut
stream
=
client
.connect
(
server_ip
);
port
.
take
()
.
recv
();
port
.recv
();
stream
.write
([
99
]);
}
None
=>
fail
!
()
...
...
@@ -254,15 +247,13 @@ fn stream_smoke_test_ip6() {
let
server_ip
=
next_test_ip6
();
let
client_ip
=
next_test_ip6
();
let
(
port
,
chan
)
=
oneshot
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
do
spawntask
{
match
UdpSocket
::
bind
(
server_ip
)
{
Some
(
server
)
=>
{
let
server
=
~
server
;
let
mut
stream
=
server
.connect
(
client_ip
);
chan
.
take
()
.
send
(());
chan
.send
(());
let
mut
buf
=
[
0
];
match
stream
.read
(
buf
)
{
Some
(
nread
)
=>
{
...
...
@@ -281,7 +272,7 @@ fn stream_smoke_test_ip6() {
Some
(
client
)
=>
{
let
client
=
~
client
;
let
mut
stream
=
client
.connect
(
server_ip
);
port
.
take
()
.
recv
();
port
.recv
();
stream
.write
([
99
]);
}
None
=>
fail
!
()
...
...
src/libstd/io/net/unix.rs
浏览文件 @
786dea20
...
...
@@ -152,32 +152,26 @@ fn accept(&mut self) -> Option<UnixStream> {
mod
tests
{
use
prelude
::
*
;
use
super
::
*
;
use
cell
::
Cell
;
use
rt
::
test
::
*
;
use
io
::
*
;
use
rt
::
comm
::
oneshot
;
fn
smalltest
(
server
:
proc
(
UnixStream
),
client
:
proc
(
UnixStream
))
{
let
server
=
Cell
::
new
(
server
);
let
client
=
Cell
::
new
(
client
);
do
run_in_mt_newsched_task
{
let
server
=
Cell
::
new
(
server
.take
());
let
client
=
Cell
::
new
(
client
.take
());
let
path1
=
next_test_unix
();
let
path2
=
path1
.clone
();
let
(
port
,
chan
)
=
oneshot
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
let
(
client
,
server
)
=
(
client
,
server
);
do
spawntask
{
let
mut
acceptor
=
UnixListener
::
bind
(
&
path1
)
.listen
();
chan
.
take
()
.
send
(());
server
.take
()
(
acceptor
.accept
()
.unwrap
());
chan
.send
(());
server
(
acceptor
.accept
()
.unwrap
());
}
do
spawntask
{
port
.
take
()
.
recv
();
client
.take
()
(
UnixStream
::
connect
(
&
path2
)
.unwrap
());
port
.recv
();
client
(
UnixStream
::
connect
(
&
path2
)
.unwrap
());
}
}
}
...
...
@@ -260,12 +254,10 @@ fn accept_lots() {
let
path1
=
next_test_unix
();
let
path2
=
path1
.clone
();
let
(
port
,
chan
)
=
oneshot
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
do
spawntask
{
let
mut
acceptor
=
UnixListener
::
bind
(
&
path1
)
.listen
();
chan
.
take
()
.
send
(());
chan
.send
(());
times
.times
(||
{
let
mut
client
=
acceptor
.accept
();
let
mut
buf
=
[
0
];
...
...
@@ -275,7 +267,7 @@ fn accept_lots() {
}
do
spawntask
{
port
.
take
()
.
recv
();
port
.recv
();
times
.times
(||
{
let
mut
stream
=
UnixStream
::
connect
(
&
path2
);
stream
.write
([
100
]);
...
...
src/libstd/rt/comm.rs
浏览文件 @
786dea20
...
...
@@ -843,9 +843,8 @@ fn oneshot_single_thread_peek_open() {
fn
oneshot_multi_task_recv_then_send
()
{
do
run_in_newsched_task
{
let
(
port
,
chan
)
=
oneshot
::
<~
int
>
();
let
port_cell
=
Cell
::
new
(
port
);
do
spawntask
{
assert
!
(
port
_cell
.take
()
.recv
()
==
~
10
);
assert
!
(
port
.recv
()
==
~
10
);
}
chan
.send
(
~
10
);
...
...
@@ -856,13 +855,11 @@ fn oneshot_multi_task_recv_then_send() {
fn
oneshot_multi_task_recv_then_close
()
{
do
run_in_newsched_task
{
let
(
port
,
chan
)
=
oneshot
::
<~
int
>
();
let
port_cell
=
Cell
::
new
(
port
);
let
chan_cell
=
Cell
::
new
(
chan
);
do
spawntask_later
{
let
_
cell
=
chan_cell
.take
()
;
let
_
=
chan
;
}
let
res
=
do
spawntask_try
{
assert
!
(
port
_cell
.take
()
.recv
()
==
~
10
);
assert
!
(
port
.recv
()
==
~
10
);
};
assert
!
(
res
.is_err
());
}
...
...
@@ -874,9 +871,8 @@ fn oneshot_multi_thread_close_stress() {
stress_factor
()
.times
(||
{
do
run_in_newsched_task
{
let
(
port
,
chan
)
=
oneshot
::
<
int
>
();
let
port_cell
=
Cell
::
new
(
port
);
let
thread
=
do
spawntask_thread
{
let
_
p
=
port_cell
.take
()
;
let
_
=
port
;
};
let
_
chan
=
chan
;
thread
.join
();
...
...
@@ -890,14 +886,11 @@ fn oneshot_multi_thread_send_close_stress() {
stress_factor
()
.times
(||
{
do
run_in_newsched_task
{
let
(
port
,
chan
)
=
oneshot
::
<
int
>
();
let
chan_cell
=
Cell
::
new
(
chan
);
let
port_cell
=
Cell
::
new
(
port
);
let
thread1
=
do
spawntask_thread
{
let
_
p
=
port_cell
.take
()
;
let
_
=
port
;
};
let
thread2
=
do
spawntask_thread
{
let
c
=
chan_cell
.take
();
c
.send
(
1
);
chan
.send
(
1
);
};
thread1
.join
();
thread2
.join
();
...
...
@@ -911,19 +904,17 @@ fn oneshot_multi_thread_recv_close_stress() {
stress_factor
()
.times
(||
{
do
run_in_newsched_task
{
let
(
port
,
chan
)
=
oneshot
::
<
int
>
();
let
chan_cell
=
Cell
::
new
(
chan
);
let
port_cell
=
Cell
::
new
(
port
);
let
thread1
=
do
spawntask_thread
{
let
port
_cell
=
Cell
::
new
(
port_cell
.take
())
;
let
port
=
port
;
let
res
=
do
spawntask_try
{
port
_cell
.take
()
.recv
();
port
.recv
();
};
assert
!
(
res
.is_err
());
};
let
thread2
=
do
spawntask_thread
{
let
chan
_cell
=
Cell
::
new
(
chan_cell
.take
())
;
let
chan
=
chan
;
do
spawntask
{
chan_cell
.take
()
;
let
_
=
chan
;
}
};
thread1
.join
();
...
...
@@ -938,13 +929,11 @@ fn oneshot_multi_thread_send_recv_stress() {
stress_factor
()
.times
(||
{
do
run_in_newsched_task
{
let
(
port
,
chan
)
=
oneshot
::
<~
int
>
();
let
chan_cell
=
Cell
::
new
(
chan
);
let
port_cell
=
Cell
::
new
(
port
);
let
thread1
=
do
spawntask_thread
{
chan
_cell
.take
()
.send
(
~
10
);
chan
.send
(
~
10
);
};
let
thread2
=
do
spawntask_thread
{
assert
!
(
port
_cell
.take
()
.recv
()
==
~
10
);
assert
!
(
port
.recv
()
==
~
10
);
};
thread1
.join
();
thread2
.join
();
...
...
@@ -965,9 +954,7 @@ fn stream_send_recv_stress() {
fn
send
(
chan
:
Chan
<~
int
>
,
i
:
int
)
{
if
i
==
10
{
return
}
let
chan_cell
=
Cell
::
new
(
chan
);
do
spawntask_random
{
let
chan
=
chan_cell
.take
();
chan
.send
(
~
i
);
send
(
chan
,
i
+
1
);
}
...
...
@@ -976,9 +963,7 @@ fn send(chan: Chan<~int>, i: int) {
fn
recv
(
port
:
Port
<~
int
>
,
i
:
int
)
{
if
i
==
10
{
return
}
let
port_cell
=
Cell
::
new
(
port
);
do
spawntask_random
{
let
port
=
port_cell
.take
();
assert
!
(
port
.recv
()
==
~
i
);
recv
(
port
,
i
+
1
);
};
...
...
@@ -1141,14 +1126,11 @@ fn send_deferred() {
let
cshared
=
SharedChan
::
new
(
cshared
);
let
mp
=
megapipe
();
let
pone
=
Cell
::
new
(
pone
);
do
spawntask
{
pone
.take
()
.recv
();
}
let
pstream
=
Cell
::
new
(
pstream
);
do
spawntask
{
pstream
.take
()
.recv
();
}
let
pshared
=
Cell
::
new
(
pshared
);
do
spawntask
{
pshared
.take
()
.recv
();
}
let
p_mp
=
Cell
::
new
(
mp
.clone
());
do
spawntask
{
p_mp
.take
()
.recv
();
}
do
spawntask
{
pone
.recv
();
}
do
spawntask
{
pstream
.recv
();
}
do
spawntask
{
pshared
.recv
();
}
let
p_mp
=
mp
.clone
();
do
spawntask
{
p_mp
.recv
();
}
let
cs
=
Cell
::
new
((
cone
,
cstream
,
cshared
,
mp
));
unsafe
{
...
...
src/libstd/rt/kill.rs
浏览文件 @
786dea20
...
...
@@ -151,7 +151,6 @@ fn test_something_in_another_task {
*/
use
cast
;
use
cell
::
Cell
;
use
option
::{
Option
,
Some
,
None
};
use
prelude
::
*
;
use
rt
::
task
::
Task
;
...
...
@@ -256,8 +255,10 @@ pub fn new() -> Death {
/// Collect failure exit codes from children and propagate them to a parent.
pub
fn
collect_failure
(
&
mut
self
,
result
:
UnwindResult
)
{
let
result
=
Cell
::
new
(
result
);
self
.on_exit
.take
()
.map
(|
on_exit
|
on_exit
(
result
.take
()));
match
self
.on_exit
.take
()
{
None
=>
{}
Some
(
on_exit
)
=>
on_exit
(
result
),
}
}
/// Enter a possibly-nested "atomic" section of code. Just for assertions.
...
...
src/libstd/rt/mod.rs
浏览文件 @
786dea20
...
...
@@ -57,7 +57,6 @@
// XXX: this should not be here.
#[allow(missing_doc)]
;
use
cell
::
Cell
;
use
clone
::
Clone
;
use
container
::
Container
;
use
iter
::
Iterator
;
...
...
@@ -274,7 +273,7 @@ fn run_(main: proc(), use_main_sched: bool) -> int {
let
nscheds
=
util
::
default_sched_threads
();
let
m
ain
=
Cell
::
new
(
main
);
let
m
ut
main
=
Some
(
main
);
// The shared list of sleeping schedulers.
let
sleepers
=
SleeperList
::
new
();
...
...
@@ -376,24 +375,24 @@ fn run_(main: proc(), use_main_sched: bool) -> int {
};
let
mut
threads
=
~
[];
let
on_exit
=
Cell
::
new
(
on_exit
);
let
mut
on_exit
=
Some
(
on_exit
);
if
!
use_main_sched
{
// In the case where we do not use a main_thread scheduler we
// run the main task in one of our threads.
let
mut
main_task
=
~
Task
::
new_root
(
&
mut
scheds
[
0
]
.stack_pool
,
None
,
main
.take
());
let
mut
main_task
=
~
Task
::
new_root
(
&
mut
scheds
[
0
]
.stack_pool
,
None
,
::
util
::
replace
(
&
mut
main
,
None
)
.unwrap
());
main_task
.name
=
Some
(
SendStrStatic
(
"<main>"
));
main_task
.death.on_exit
=
Some
(
on_exit
.take
());
let
main_task_cell
=
Cell
::
new
(
main_task
);
main_task
.death.on_exit
=
::
util
::
replace
(
&
mut
on_exit
,
None
);
let
sched
=
scheds
.pop
();
let
sched_cell
=
Cell
::
new
(
sched
)
;
let
main_task
=
main_task
;
let
thread
=
do
Thread
::
start
{
let
sched
=
sched_cell
.take
();
sched
.bootstrap
(
main_task_cell
.take
());
sched
.bootstrap
(
main_task
);
};
threads
.push
(
thread
);
}
...
...
@@ -401,9 +400,8 @@ fn run_(main: proc(), use_main_sched: bool) -> int {
// Run each remaining scheduler in a thread.
for
sched
in
scheds
.move_rev_iter
()
{
rtdebug!
(
"creating regular schedulers"
);
let
sched_cell
=
Cell
::
new
(
sched
);
let
thread
=
do
Thread
::
start
{
let
mut
sched
=
sched
_cell
.take
()
;
let
mut
sched
=
sched
;
let
bootstrap_task
=
~
do
Task
::
new_root
(
&
mut
sched
.stack_pool
,
None
)
||
{
rtdebug!
(
"boostraping a non-primary scheduler"
);
};
...
...
@@ -415,16 +413,19 @@ fn run_(main: proc(), use_main_sched: bool) -> int {
// If we do have a main thread scheduler, run it now.
if
use_main_sched
{
rtdebug!
(
"about to create the main scheduler task"
);
let
mut
main_sched
=
main_sched
.unwrap
();
let
home
=
Sched
(
main_sched
.make_handle
());
let
mut
main_task
=
~
Task
::
new_root_homed
(
&
mut
main_sched
.stack_pool
,
None
,
home
,
main
.take
());
let
mut
main_task
=
~
Task
::
new_root_homed
(
&
mut
main_sched
.stack_pool
,
None
,
home
,
::
util
::
replace
(
&
mut
main
,
None
)
.
unwrap
());
main_task
.name
=
Some
(
SendStrStatic
(
"<main>"
));
main_task
.death.on_exit
=
Some
(
on_exit
.take
()
);
main_task
.death.on_exit
=
::
util
::
replace
(
&
mut
on_exit
,
None
);
rtdebug!
(
"bootstrapping main_task"
);
main_sched
.bootstrap
(
main_task
);
...
...
src/libstd/rt/sched.rs
浏览文件 @
786dea20
...
...
@@ -924,7 +924,6 @@ mod test {
use
unstable
::
run_in_bare_thread
;
use
borrow
::
to_uint
;
use
rt
::
sched
::{
Scheduler
};
use
cell
::
Cell
;
use
rt
::
deque
::
BufferPool
;
use
rt
::
thread
::
Thread
;
use
rt
::
task
::{
Task
,
Sched
};
...
...
@@ -1050,7 +1049,7 @@ fn test_schedule_home_states() {
queues
.clone
(),
sleepers
.clone
());
let
normal_handle
=
Cell
::
new
(
normal_sched
.make_handle
()
);
let
normal_handle
=
normal_sched
.make_handle
(
);
let
friend_handle
=
normal_sched
.make_handle
();
...
...
@@ -1063,7 +1062,7 @@ fn test_schedule_home_states() {
false
,
Some
(
friend_handle
));
let
special_handle
=
Cell
::
new
(
special_sched
.make_handle
()
);
let
special_handle
=
special_sched
.make_handle
(
);
let
t1_handle
=
special_sched
.make_handle
();
let
t4_handle
=
special_sched
.make_handle
();
...
...
@@ -1094,26 +1093,19 @@ fn test_schedule_home_states() {
};
rtdebug!
(
"task4 id: **{}**"
,
borrow
::
to_uint
(
task4
));
let
task1
=
Cell
::
new
(
task1
);
let
task2
=
Cell
::
new
(
task2
);
let
task3
=
Cell
::
new
(
task3
);
let
task4
=
Cell
::
new
(
task4
);
// Signal from the special task that we are done.
let
(
port
,
chan
)
=
oneshot
::
<
()
>
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
let
normal_task
=
~
do
Task
::
new_root
(
&
mut
normal_sched
.stack_pool
,
None
)
{
rtdebug!
(
"*about to submit task2*"
);
Scheduler
::
run_task
(
task2
.take
()
);
Scheduler
::
run_task
(
task2
);
rtdebug!
(
"*about to submit task4*"
);
Scheduler
::
run_task
(
task4
.take
()
);
Scheduler
::
run_task
(
task4
);
rtdebug!
(
"*normal_task done*"
);
port
.
take
()
.
recv
();
let
mut
nh
=
normal_handle
.take
()
;
port
.recv
();
let
mut
nh
=
normal_handle
;
nh
.send
(
Shutdown
);
let
mut
sh
=
special_handle
.take
()
;
let
mut
sh
=
special_handle
;
sh
.send
(
Shutdown
);
};
...
...
@@ -1121,27 +1113,24 @@ fn test_schedule_home_states() {
let
special_task
=
~
do
Task
::
new_root
(
&
mut
special_sched
.stack_pool
,
None
)
{
rtdebug!
(
"*about to submit task1*"
);
Scheduler
::
run_task
(
task1
.take
()
);
Scheduler
::
run_task
(
task1
);
rtdebug!
(
"*about to submit task3*"
);
Scheduler
::
run_task
(
task3
.take
()
);
Scheduler
::
run_task
(
task3
);
rtdebug!
(
"*done with special_task*"
);
chan
.
take
()
.
send
(());
chan
.send
(());
};
rtdebug!
(
"special task: {}"
,
borrow
::
to_uint
(
special_task
));
let
special_sched
=
Cell
::
new
(
special_sched
);
let
normal_sched
=
Cell
::
new
(
normal_sched
);
let
special_task
=
Cell
::
new
(
special_task
);
let
normal_task
=
Cell
::
new
(
normal_task
);
let
normal_sched
=
normal_sched
;
let
normal_thread
=
do
Thread
::
start
{
normal_sched
.
take
()
.bootstrap
(
normal_task
.take
()
);
normal_sched
.
bootstrap
(
normal_task
);
rtdebug!
(
"finished with normal_thread"
);
};
let
special_sched
=
special_sched
;
let
special_thread
=
do
Thread
::
start
{
special_sched
.
take
()
.bootstrap
(
special_task
.take
()
);
special_sched
.
bootstrap
(
special_task
);
rtdebug!
(
"finished with special_sched"
);
};
...
...
@@ -1180,20 +1169,18 @@ fn handle() {
do
run_in_bare_thread
{
let
(
port
,
chan
)
=
oneshot
::
<
()
>
();
let
port
=
Cell
::
new
(
port
);
let
chan
=
Cell
::
new
(
chan
);
let
thread_one
=
do
Thread
::
start
{
let
chan
=
Cell
::
new
(
chan
.take
())
;
let
chan
=
chan
;
do
run_in_newsched_task_core
{
chan
.
take
()
.
send
(());
chan
.send
(());
}
};
let
thread_two
=
do
Thread
::
start
{
let
port
=
Cell
::
new
(
port
.take
())
;
let
port
=
port
;
do
run_in_newsched_task_core
{
port
.
take
()
.
recv
();
port
.recv
();
}
};
...
...
@@ -1224,10 +1211,9 @@ fn no_missed_messages() {
let
mut
handle
=
sched
.make_handle
();
let
sched
=
Cell
::
new
(
sched
);
let
sched
=
sched
;
let
thread
=
do
Thread
::
start
{
let
mut
sched
=
sched
.take
()
;
let
mut
sched
=
sched
;
let
bootstrap_task
=
~
Task
::
new_root
(
&
mut
sched
.stack_pool
,
None
,
...
...
@@ -1258,9 +1244,8 @@ fn multithreading() {
let
mut
ports
=
~
[];
10
.times
(||
{
let
(
port
,
chan
)
=
oneshot
();
let
chan_cell
=
Cell
::
new
(
chan
);
do
spawntask_later
{
chan
_cell
.take
()
.send
(());
chan
.send
(());
}
ports
.push
(
port
);
});
...
...
src/libstd/rt/test.rs
浏览文件 @
786dea20
...
...
@@ -10,7 +10,6 @@
use
io
::
net
::
ip
::{
SocketAddr
,
Ipv4Addr
,
Ipv6Addr
};
use
cell
::
Cell
;
use
clone
::
Clone
;
use
container
::
Container
;
use
iter
::{
Iterator
,
range
};
...
...
@@ -65,16 +64,14 @@ pub fn new_test_sched() -> Scheduler {
}
pub
fn
run_in_uv_task
(
f
:
proc
())
{
let
f
=
Cell
::
new
(
f
);
do
run_in_bare_thread
{
run_in_uv_task_core
(
f
.take
()
);
run_in_uv_task_core
(
f
);
}
}
pub
fn
run_in_newsched_task
(
f
:
proc
())
{
let
f
=
Cell
::
new
(
f
);
do
run_in_bare_thread
{
run_in_newsched_task_core
(
f
.take
()
);
run_in_newsched_task_core
(
f
);
}
}
...
...
@@ -206,8 +203,6 @@ pub fn run_in_mt_newsched_task(f: proc()) {
// see comment in other function (raising fd limits)
prepare_for_lots_of_tests
();
let
f
=
Cell
::
new
(
f
);
do
run_in_bare_thread
{
let
nthreads
=
match
os
::
getenv
(
"RUST_RT_TEST_THREADS"
)
{
Some
(
nstr
)
=>
FromStr
::
from_str
(
nstr
)
.unwrap
(),
...
...
@@ -254,18 +249,18 @@ pub fn run_in_mt_newsched_task(f: proc()) {
rtassert!
(
exit_status
.is_success
());
};
let
mut
main_task
=
~
Task
::
new_root
(
&
mut
scheds
[
0
]
.stack_pool
,
None
,
f
.take
());
let
mut
main_task
=
~
Task
::
new_root
(
&
mut
scheds
[
0
]
.stack_pool
,
None
,
f
);
main_task
.death.on_exit
=
Some
(
on_exit
);
let
mut
threads
=
~
[];
let
main_task
=
Cell
::
new
(
main_task
);
let
main_thread
=
{
let
sched
=
scheds
.pop
();
let
sched_cell
=
Cell
::
new
(
sched
)
;
let
main_task
=
main_task
;
do
Thread
::
start
{
let
sched
=
sched_cell
.take
();
sched
.bootstrap
(
main_task
.take
());
sched
.bootstrap
(
main_task
);
}
};
threads
.push
(
main_thread
);
...
...
@@ -275,11 +270,9 @@ pub fn run_in_mt_newsched_task(f: proc()) {
let
bootstrap_task
=
~
do
Task
::
new_root
(
&
mut
sched
.stack_pool
,
None
)
||
{
rtdebug!
(
"bootstrapping non-primary scheduler"
);
};
let
bootstrap_task_cell
=
Cell
::
new
(
bootstrap_task
);
let
sched_cell
=
Cell
::
new
(
sched
);
let
sched
=
sched
;
let
thread
=
do
Thread
::
start
{
let
sched
=
sched_cell
.take
();
sched
.bootstrap
(
bootstrap_task_cell
.take
());
sched
.bootstrap
(
bootstrap_task
);
};
threads
.push
(
thread
);
...
...
@@ -335,11 +328,8 @@ pub fn spawntask_try(f: proc()) -> Result<(),()> {
/// Spawn a new task in a new scheduler and return a thread handle.
pub
fn
spawntask_thread
(
f
:
proc
())
->
Thread
<
()
>
{
let
f
=
Cell
::
new
(
f
);
let
thread
=
do
Thread
::
start
{
run_in_newsched_task_core
(
f
.take
()
);
run_in_newsched_task_core
(
f
);
};
return
thread
;
...
...
src/libstd/task/mod.rs
浏览文件 @
786dea20
...
...
@@ -55,7 +55,6 @@
use
prelude
::
*
;
use
cell
::
Cell
;
use
comm
::{
stream
,
Chan
,
GenericChan
,
GenericPort
,
Port
,
Peekable
};
use
result
::{
Result
,
Ok
,
Err
};
use
rt
::
in_green_task_context
;
...
...
@@ -284,10 +283,8 @@ pub fn add_wrapper(&mut self, wrapper: proc(v: proc()) -> proc()) {
f
}
};
let
prev_gen_body
=
Cell
::
new
(
prev_gen_body
);
let
next_gen_body
=
{
let
f
:
proc
(
proc
())
->
proc
()
=
proc
(
body
)
{
let
prev_gen_body
=
prev_gen_body
.take
();
wrapper
(
prev_gen_body
(
body
))
};
f
...
...
@@ -548,11 +545,9 @@ struct Wrapper {
fn
test_add_wrapper
()
{
let
(
po
,
ch
)
=
stream
::
<
()
>
();
let
mut
b0
=
task
();
let
ch
=
Cell
::
new
(
ch
);
do
b0
.add_wrapper
|
body
|
{
let
ch
=
Cell
::
new
(
ch
.take
())
;
let
ch
=
ch
;
let
result
:
proc
()
=
proc
()
{
let
ch
=
ch
.take
();
body
();
ch
.send
(());
};
...
...
@@ -642,12 +637,10 @@ fn test_spawn_sched_childs_on_default_sched() {
// Assuming tests run on the default scheduler
let
default_id
=
get_sched_id
();
let
ch
=
Cell
::
new
(
ch
);
do
spawn_sched
(
SingleThreaded
)
{
let
parent_sched_id
=
get_sched_id
();
let
ch
=
Cell
::
new
(
ch
.take
())
;
let
ch
=
ch
;
do
spawn
{
let
ch
=
ch
.take
();
let
child_sched_id
=
get_sched_id
();
assert
!
(
parent_sched_id
!=
child_sched_id
);
assert_eq!
(
child_sched_id
,
default_id
);
...
...
@@ -671,10 +664,10 @@ fn test_spawn_sched_blocking() {
let
(
fin_po
,
fin_ch
)
=
stream
();
let
mut
lock
=
Mutex
::
new
();
let
lock2
=
Cell
::
new
(
lock
.clone
()
);
let
lock2
=
lock
.clone
(
);
do
spawn_sched
(
SingleThreaded
)
{
let
mut
lock
=
lock2
.take
()
;
let
mut
lock
=
lock2
;
lock
.lock
();
start_ch
.send
(());
...
...
src/libstd/task/spawn.rs
浏览文件 @
786dea20
...
...
@@ -77,7 +77,6 @@
use
prelude
::
*
;
use
cell
::
Cell
;
use
comm
::{
GenericChan
,
oneshot
};
use
rt
::
local
::
Local
;
use
rt
::
sched
::{
Scheduler
,
Shutdown
,
TaskFromFriend
};
...
...
@@ -134,23 +133,19 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) {
// Create a task that will later be used to join with the new scheduler
// thread when it is ready to terminate
let
(
thread_port
,
thread_chan
)
=
oneshot
();
let
thread_port_cell
=
Cell
::
new
(
thread_port
);
let
join_task
=
do
Task
::
build_child
(
None
)
{
debug!
(
"running join task"
);
let
thread_port
=
thread_port_cell
.take
();
let
thread
:
Thread
<
()
>
=
thread_port
.recv
();
thread
.join
();
};
// Put the scheduler into another thread
let
new_sched_cell
=
Cell
::
new
(
new_sched
);
let
orig_sched_handle_cell
=
Cell
::
new
((
*
sched
)
.make_handle
());
let
join_task_cell
=
Cell
::
new
(
join_task
);
let
orig_sched_handle
=
(
*
sched
)
.make_handle
();
let
new_sched
=
new_sched
;
let
thread
=
do
Thread
::
start
{
let
mut
new_sched
=
new_sched_cell
.take
();
let
mut
orig_sched_handle
=
orig_sched_handle_cell
.take
();
let
join_task
=
join_task_cell
.take
();
let
mut
new_sched
=
new_sched
;
let
mut
orig_sched_handle
=
orig_sched_handle
;
let
bootstrap_task
=
~
do
Task
::
new_root
(
&
mut
new_sched
.stack_pool
,
None
)
||
{
debug!
(
"boostrapping a 1:1 scheduler"
);
...
...
@@ -178,9 +173,8 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) {
if
opts
.notify_chan
.is_some
()
{
let
notify_chan
=
opts
.notify_chan
.take_unwrap
();
let
notify_chan
=
Cell
::
new
(
notify_chan
);
let
on_exit
:
proc
(
UnwindResult
)
=
proc
(
task_result
)
{
notify_chan
.
take
()
.
send
(
task_result
)
notify_chan
.send
(
task_result
)
};
task
.death.on_exit
=
Some
(
on_exit
);
}
...
...
src/libstd/unstable/mod.rs
浏览文件 @
786dea20
...
...
@@ -37,15 +37,13 @@
a normal large stack.
*/
pub
fn
run_in_bare_thread
(
f
:
proc
())
{
use
cell
::
Cell
;
use
rt
::
thread
::
Thread
;
let
f_cell
=
Cell
::
new
(
f
);
let
(
port
,
chan
)
=
comm
::
stream
();
// FIXME #4525: Unfortunate that this creates an extra scheduler but it's
// necessary since rust_raw_thread_join is blocking
do
task
::
spawn_sched
(
task
::
SingleThreaded
)
{
Thread
::
start
(
f
_cell
.take
()
)
.join
();
Thread
::
start
(
f
)
.join
();
chan
.send
(());
}
port
.recv
();
...
...
src/test/bench/msgsend-ring-mutex-arcs.rs
浏览文件 @
786dea20
...
...
@@ -20,7 +20,6 @@
use
extra
::
arc
;
use
extra
::
future
::
Future
;
use
extra
::
time
;
use
std
::
cell
::
Cell
;
use
std
::
os
;
use
std
::
uint
;
...
...
@@ -91,12 +90,8 @@ fn main() {
for
i
in
range
(
1u
,
num_tasks
)
{
//error!("spawning %?", i);
let
(
new_chan
,
num_port
)
=
init
();
let
num_chan2
=
Cell
::
new
(
num_chan
);
let
num_port
=
Cell
::
new
(
num_port
);
let
new_future
=
do
Future
::
spawn
()
{
let
num_chan
=
num_chan2
.take
();
let
num_port1
=
num_port
.take
();
thread_ring
(
i
,
msg_per_task
,
num_chan
,
num_port1
)
thread_ring
(
i
,
msg_per_task
,
num_chan
,
num_port
)
};
futures
.push
(
new_future
);
num_chan
=
new_chan
;
...
...
src/test/bench/msgsend-ring-rw-arcs.rs
浏览文件 @
786dea20
...
...
@@ -20,7 +20,6 @@
use
extra
::
arc
;
use
extra
::
future
::
Future
;
use
extra
::
time
;
use
std
::
cell
::
Cell
;
use
std
::
os
;
use
std
::
uint
;
...
...
@@ -87,12 +86,8 @@ fn main() {
for
i
in
range
(
1u
,
num_tasks
)
{
//error!("spawning %?", i);
let
(
new_chan
,
num_port
)
=
init
();
let
num_chan2
=
Cell
::
new
(
num_chan
);
let
num_port
=
Cell
::
new
(
num_port
);
let
new_future
=
do
Future
::
spawn
{
let
num_chan
=
num_chan2
.take
();
let
num_port1
=
num_port
.take
();
thread_ring
(
i
,
msg_per_task
,
num_chan
,
num_port1
)
thread_ring
(
i
,
msg_per_task
,
num_chan
,
num_port
)
};
futures
.push
(
new_future
);
num_chan
=
new_chan
;
...
...
src/test/bench/rt-messaging-ping-pong.rs
浏览文件 @
786dea20
...
...
@@ -13,7 +13,6 @@
use
std
::
os
;
use
std
::
uint
;
use
std
::
rt
::
test
::
spawntask_later
;
use
std
::
cell
::
Cell
;
// This is a simple bench that creates M pairs of of tasks. These
// tasks ping-pong back and forth over a pair of streams. This is a
...
...
@@ -24,19 +23,14 @@ fn ping_pong_bench(n: uint, m: uint) {
// Create pairs of tasks that pingpong back and forth.
fn
run_pair
(
n
:
uint
)
{
// Create a stream A->B
let
(
pa
,
ca
)
=
stream
::
<
()
>
();
// Create a stream B->A
let
(
pb
,
cb
)
=
stream
::
<
()
>
();
let
pa
=
Cell
::
new
(
pa
);
let
ca
=
Cell
::
new
(
ca
);
let
pb
=
Cell
::
new
(
pb
);
let
cb
=
Cell
::
new
(
cb
);
// Create a stream A->B
let
(
pa
,
ca
)
=
stream
::
<
()
>
();
// Create a stream B->A
let
(
pb
,
cb
)
=
stream
::
<
()
>
();
do
spawntask_later
()
||
{
let
chan
=
ca
.take
()
;
let
port
=
pb
.take
()
;
let
chan
=
ca
;
let
port
=
pb
;
n
.times
(||
{
chan
.send
(());
port
.recv
();
...
...
@@ -44,8 +38,8 @@ fn run_pair(n: uint) {
}
do
spawntask_later
()
||
{
let
chan
=
cb
.take
()
;
let
port
=
pa
.take
()
;
let
chan
=
cb
;
let
port
=
pa
;
n
.times
(||
{
port
.recv
();
chan
.send
(());
...
...
src/test/bench/rt-parfib.rs
浏览文件 @
786dea20
...
...
@@ -13,7 +13,6 @@
use
std
::
os
;
use
std
::
uint
;
use
std
::
rt
::
test
::
spawntask_later
;
use
std
::
cell
::
Cell
;
use
std
::
comm
::
oneshot
;
// A simple implementation of parfib. One subtree is found in a new
...
...
@@ -26,9 +25,8 @@ fn parfib(n: uint) -> uint {
}
let
(
port
,
chan
)
=
oneshot
::
<
uint
>
();
let
chan
=
Cell
::
new
(
chan
);
do
spawntask_later
{
chan
.
take
()
.
send
(
parfib
(
n
-
1
));
chan
.send
(
parfib
(
n
-
1
));
};
let
m2
=
parfib
(
n
-
2
);
return
(
port
.recv
()
+
m2
);
...
...
src/test/bench/shootout-chameneos-redux.rs
浏览文件 @
786dea20
...
...
@@ -12,7 +12,6 @@
extern
mod
extra
;
use
std
::
cell
::
Cell
;
use
std
::
comm
::{
stream
,
SharedChan
};
use
std
::
option
;
use
std
::
os
;
...
...
@@ -156,9 +155,11 @@ fn rendezvous(nn: uint, set: ~[color]) {
let
to_rendezvous
=
to_rendezvous
.clone
();
let
to_rendezvous_log
=
to_rendezvous_log
.clone
();
let
(
from_rendezvous
,
to_creature
)
=
stream
();
let
from_rendezvous
=
Cell
::
new
(
from_rendezvous
);
do
task
::
spawn
||
{
creature
(
ii
,
col
,
from_rendezvous
.take
(),
to_rendezvous
.clone
(),
do
task
::
spawn
{
creature
(
ii
,
col
,
from_rendezvous
,
to_rendezvous
.clone
(),
to_rendezvous_log
.clone
());
}
to_creature
...
...
src/test/bench/task-perf-jargon-metal-smoke.rs
浏览文件 @
786dea20
...
...
@@ -17,7 +17,6 @@
//
// The filename is a song reference; google it in quotes.
use
std
::
cell
::
Cell
;
use
std
::
comm
;
use
std
::
os
;
use
std
::
task
;
...
...
@@ -27,9 +26,7 @@ fn child_generation(gens_left: uint, c: comm::Chan<()>) {
// This used to be O(n^2) in the number of generations that ever existed.
// With this code, only as many generations are alive at a time as tasks
// alive at a time,
let
c
=
Cell
::
new
(
c
);
do
spawn
{
let
c
=
c
.take
();
if
gens_left
&
1
==
1
{
task
::
deschedule
();
// shake things up a bit
}
...
...
src/test/compile-fail/no-send-res-ports.rs
浏览文件 @
786dea20
...
...
@@ -10,7 +10,6 @@
#[feature(managed_boxes)]
;
use
std
::
cell
::
Cell
;
use
std
::
task
;
struct
Port
<
T
>
(
@
T
);
...
...
@@ -31,10 +30,10 @@ fn foo(x: Port<()>) -> foo {
}
}
let
x
=
Cell
::
new
(
foo
(
Port
(
@
()
)));
let
x
=
foo
(
Port
(
@
(
)));
do
task
::
spawn
{
let
y
=
x
.take
()
;
//~ ERROR does not fulfill `Send`
let
y
=
x
;
//~ ERROR does not fulfill `Send`
error!
(
"{:?}"
,
y
);
}
}
src/test/compile-fail/no_freeze-rc.rs
浏览文件 @
786dea20
...
...
@@ -9,11 +9,11 @@
// except according to those terms.
use
std
::
rc
::
Rc
;
use
std
::
cell
::
Cell
;
use
std
::
cell
::
Ref
Cell
;
fn
bar
<
T
:
Freeze
>
(
_
:
T
)
{}
fn
main
()
{
let
x
=
Rc
::
from_send
(
Cell
::
new
(
5
));
bar
(
x
);
//~ ERROR instantiating a type parameter with an incompatible type `std::rc::Rc<std::cell::Cell<int>>`, which does not fulfill `Freeze`
let
x
=
Rc
::
from_send
(
Ref
Cell
::
new
(
5
));
bar
(
x
);
//~ ERROR instantiating a type parameter with an incompatible type `std::rc::Rc<std::cell::
Ref
Cell<int>>`, which does not fulfill `Freeze`
}
src/test/run-pass/issue-2718.rs
浏览文件 @
786dea20
...
...
@@ -313,8 +313,6 @@ pub fn main() {
// Commented out because of option::get error
let (client_, server_) = pingpong::init();
let client_ = Cell::new(client_);
let server_ = Cell::new(server_);
task::spawn {|client_|
let client__ = client_.take();
...
...
src/test/run-pass/sendfn-spawn-with-fn-arg.rs
浏览文件 @
786dea20
...
...
@@ -8,7 +8,6 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use
std
::
cell
::
Cell
;
use
std
::
task
;
pub
fn
main
()
{
test05
();
}
...
...
@@ -23,8 +22,7 @@ fn test05() {
error!
(
"{}"
,
*
three
+
n
);
// will copy x into the closure
assert_eq!
(
*
three
,
3
);
};
let
fn_to_send
=
Cell
::
new
(
fn_to_send
);
task
::
spawn
(
proc
()
{
test05_start
(
fn_to_send
.take
()
);
test05_start
(
fn_to_send
);
});
}
src/test/run-pass/task-killjoin-rsrc.rs
浏览文件 @
786dea20
...
...
@@ -13,7 +13,6 @@
// A port of task-killjoin to use a class with a dtor to manage
// the join.
use
std
::
cell
::
Cell
;
use
std
::
comm
::
*
;
use
std
::
ptr
;
use
std
::
task
;
...
...
@@ -55,9 +54,8 @@ fn wrapper(c: Chan<bool>, f: ||) {
*
b
=
true
;
}
let
(
p
,
c
)
=
stream
();
let
c
=
Cell
::
new
(
c
);
do
task
::
spawn_unlinked
{
let
ccc
=
c
.take
()
;
let
ccc
=
c
;
wrapper
(
ccc
,
f
)
}
p
...
...
src/test/run-pass/tempfile.rs
浏览文件 @
786dea20
...
...
@@ -22,11 +22,10 @@
extern
mod
extra
;
use
extra
::
tempfile
::
TempDir
;
use
std
::
io
::
fs
;
use
std
::
io
;
use
std
::
os
;
use
std
::
task
;
use
std
::
cell
::
Cell
;
use
std
::
io
;
use
std
::
io
::
fs
;
fn
test_tempdir
()
{
let
path
=
{
...
...
@@ -51,9 +50,8 @@ fn test_rm_tempdir() {
let
tmp
=
TempDir
::
new
(
"test_rm_tempdir"
)
.unwrap
();
let
path
=
tmp
.path
()
.clone
();
let
cell
=
Cell
::
new
(
tmp
);
let
f
:
proc
()
=
proc
()
{
let
_
tmp
=
cell
.take
()
;
let
_
tmp
=
tmp
;
fail
!
(
"fail to unwind past `tmp`"
);
};
task
::
try
(
f
);
...
...
src/test/run-pass/trait-bounds-in-arc.rs
浏览文件 @
786dea20
...
...
@@ -16,10 +16,10 @@
// xfail-fast
extern
mod
extra
;
use
extra
::
arc
;
use
std
::
comm
;
use
std
::
task
;
use
std
::
cell
;
trait
Pet
{
fn
name
(
&
self
,
blk
:
|
&
str
|);
...
...
@@ -71,14 +71,14 @@ fn main() {
~
fishe
as
~
Pet
:
Freeze
+
Send
,
~
dogge2
as
~
Pet
:
Freeze
+
Send
]);
let
(
p1
,
c1
)
=
comm
::
stream
();
let
arc1
=
cell
::
Cell
::
new
(
arc
.clone
()
);
do
task
::
spawn
{
check_legs
(
arc1
.take
()
);
c1
.send
(());
}
let
arc1
=
arc
.clone
(
);
do
task
::
spawn
{
check_legs
(
arc1
);
c1
.send
(());
}
let
(
p2
,
c2
)
=
comm
::
stream
();
let
arc2
=
cell
::
Cell
::
new
(
arc
.clone
()
);
do
task
::
spawn
{
check_names
(
arc2
.take
()
);
c2
.send
(());
}
let
arc2
=
arc
.clone
(
);
do
task
::
spawn
{
check_names
(
arc2
);
c2
.send
(());
}
let
(
p3
,
c3
)
=
comm
::
stream
();
let
arc3
=
cell
::
Cell
::
new
(
arc
.clone
()
);
do
task
::
spawn
{
check_pedigree
(
arc3
.take
()
);
c3
.send
(());
}
let
arc3
=
arc
.clone
(
);
do
task
::
spawn
{
check_pedigree
(
arc3
);
c3
.send
(());
}
p1
.recv
();
p2
.recv
();
p3
.recv
();
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录