Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
int
Rust
提交
a787f400
R
Rust
项目概览
int
/
Rust
大约 1 年 前同步成功
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rust
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
a787f400
编写于
7月 03, 2012
作者:
E
Eric Holk
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Select on pipes.
Updating syntax and test cases.
上级
89bdd481
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
284 addition
and
384 deletion
+284
-384
src/libcore/pipes.rs
src/libcore/pipes.rs
+120
-22
src/libcore/vec.rs
src/libcore/vec.rs
+38
-5
src/test/bench/msgsend-ring-contracts.rs
src/test/bench/msgsend-ring-contracts.rs
+1
-1
src/test/run-pass/pipe-manual-2.rs
src/test/run-pass/pipe-manual-2.rs
+0
-178
src/test/run-pass/pipe-manual-3.rs
src/test/run-pass/pipe-manual-3.rs
+0
-178
src/test/run-pass/pipe-select.rs
src/test/run-pass/pipe-select.rs
+125
-0
未找到文件。
src/libcore/pipes.rs
浏览文件 @
a787f400
...
...
@@ -9,23 +9,29 @@ enum state {
terminated
}
type
packet
<
T
:
send
>
=
{
type
packet
_header
=
{
mut
state
:
state
,
mut
blocked_task
:
option
<*
rust_task
>
,
};
type
packet
<
T
:
send
>
=
{
header
:
packet_header
,
mut
payload
:
option
<
T
>
};
fn
packet
<
T
:
send
>
()
->
*
packet
<
T
>
unsafe
{
let
p
:
*
packet
<
T
>
=
unsafe
::
transmute
(
~
{
mut
state
:
empty
,
mut
blocked_task
:
none
::
<
task
::
task
>
,
header
:
{
mut
state
:
empty
,
mut
blocked_task
:
none
::
<
task
::
task
>
,
},
mut
payload
:
none
::
<
T
>
});
p
}
#[abi
=
"rust-intrinsic"
]
native
mod
rusti
{
extern
mod
rusti
{
fn
atomic_xchng
(
&
dst
:
int
,
src
:
int
)
->
int
;
fn
atomic_xchng_acq
(
&
dst
:
int
,
src
:
int
)
->
int
;
fn
atomic_xchng_rel
(
&
dst
:
int
,
src
:
int
)
->
int
;
...
...
@@ -33,7 +39,7 @@ fn packet<T: send>() -> *packet<T> unsafe {
type
rust_task
=
libc
::
c_void
;
native
mod
rustrt
{
extern
mod
rustrt
{
#[rust_stack]
fn
rust_get_task
()
->
*
rust_task
;
...
...
@@ -71,7 +77,7 @@ fn send<T: send>(-p: send_packet<T>, -payload: T) {
let
p
=
unsafe
{
uniquify
(
p_
)
};
assert
(
*
p
)
.payload
==
none
;
(
*
p
)
.payload
<-
some
(
payload
);
let
old_state
=
swap_state_rel
(
(
*
p
)
.state
,
full
);
let
old_state
=
swap_state_rel
(
p
.header
.state
,
full
);
alt
old_state
{
empty
{
// Yay, fastpath.
...
...
@@ -82,9 +88,10 @@ fn send<T: send>(-p: send_packet<T>, -payload: T) {
full
{
fail
"duplicate send"
}
blocked
{
#
debug
(
"waking up task for %?"
,
p_
);
alt
p
.blocked_task
{
alt
p
.
header.
blocked_task
{
some
(
task
)
{
rustrt
::
task_signal_event
(
task
,
p_
as
*
libc
::
c_void
);
rustrt
::
task_signal_event
(
task
,
ptr
::
addr_of
(
p
.header
)
as
*
libc
::
c_void
);
}
none
{
fail
"blocked packet has no task"
}
}
...
...
@@ -104,20 +111,20 @@ fn recv<T: send>(-p: recv_packet<T>) -> option<T> {
let
p
=
unsafe
{
uniquify
(
p_
)
};
let
this
=
rustrt
::
rust_get_task
();
rustrt
::
task_clear_event_reject
(
this
);
p
.blocked_task
=
some
(
this
);
p
.
header.
blocked_task
=
some
(
this
);
loop
{
let
old_state
=
swap_state_acq
(
(
*
p
)
.state
,
let
old_state
=
swap_state_acq
(
p
.header
.state
,
blocked
);
#
debug
(
"%?"
,
old_state
);
alt
old_state
{
empty
{
#
debug
(
"no data available on %?, going to sleep."
,
p_
);
rustrt
::
task_wait_event
(
this
);
#
debug
(
"woke up, p.state = %?"
,
p
.state
);
if
p
.state
==
full
{
#
debug
(
"woke up, p.state = %?"
,
p
.
header.
state
);
if
p
.
header.
state
==
full
{
let
mut
payload
=
none
;
payload
<->
(
*
p
)
.payload
;
p
.state
=
terminated
;
p
.
header.
state
=
terminated
;
ret
some
(
option
::
unwrap
(
payload
))
}
}
...
...
@@ -125,7 +132,7 @@ fn recv<T: send>(-p: recv_packet<T>) -> option<T> {
full
{
let
mut
payload
=
none
;
payload
<->
(
*
p
)
.payload
;
p
.state
=
terminated
;
p
.
header.
state
=
terminated
;
ret
some
(
option
::
unwrap
(
payload
))
}
terminated
{
...
...
@@ -138,7 +145,7 @@ fn recv<T: send>(-p: recv_packet<T>) -> option<T> {
fn
sender_terminate
<
T
:
send
>
(
p
:
*
packet
<
T
>
)
{
let
p
=
unsafe
{
uniquify
(
p
)
};
alt
swap_state_rel
(
(
*
p
)
.state
,
terminated
)
{
alt
swap_state_rel
(
p
.header
.state
,
terminated
)
{
empty
|
blocked
{
// The receiver will eventually clean up.
unsafe
{
forget
(
p
)
}
...
...
@@ -155,7 +162,7 @@ fn sender_terminate<T: send>(p: *packet<T>) {
fn
receiver_terminate
<
T
:
send
>
(
p
:
*
packet
<
T
>
)
{
let
p
=
unsafe
{
uniquify
(
p
)
};
alt
swap_state_rel
(
(
*
p
)
.state
,
terminated
)
{
alt
swap_state_rel
(
p
.header
.state
,
terminated
)
{
empty
{
// the sender will clean up
unsafe
{
forget
(
p
)
}
...
...
@@ -170,15 +177,106 @@ fn receiver_terminate<T: send>(p: *packet<T>) {
}
}
impl
private_methods
for
packet_header
{
// Returns the old state.
fn
mark_blocked
(
this
:
*
rust_task
)
->
state
{
self
.blocked_task
=
some
(
this
);
swap_state_acq
(
self
.state
,
blocked
)
}
fn
unblock
()
{
alt
swap_state_acq
(
self
.state
,
empty
)
{
empty
|
blocked
{
}
terminated
{
self
.state
=
terminated
;
}
full
{
self
.state
=
full
;
}
}
}
}
#[doc
=
"Returns when one of the packet headers reports data is
available."
]
fn
wait_many
(
pkts
:
~
[
&
a
.packet_header
])
->
uint
{
let
this
=
rustrt
::
rust_get_task
();
rustrt
::
task_clear_event_reject
(
this
);
let
mut
data_avail
=
false
;
let
mut
ready_packet
=
pkts
.len
();
for
pkts
.eachi
|
i
,
p
|
{
let
old
=
p
.mark_blocked
(
this
);
alt
old
{
full
|
terminated
{
data_avail
=
true
;
ready_packet
=
i
;
p
.state
=
old
;
break
;
}
blocked
{
fail
"blocking on blocked packet"
}
empty
{
}
}
}
while
!
data_avail
{
#
debug
(
"sleeping on %? packets"
,
pkts
.len
());
let
event
=
rustrt
::
task_wait_event
(
this
)
as
*
packet_header
;
let
pos
=
vec
::
position
(
pkts
,
|
p
|
ptr
::
addr_of
(
*
p
)
==
event
);
alt
pos
{
some
(
i
)
{
ready_packet
=
i
;
data_avail
=
true
;
}
none
{
#
debug
(
"ignoring spurious event, %?"
,
event
);
}
}
}
#
debug
(
"%?"
,
pkts
[
ready_packet
]);
for
pkts
.each
|
p
|
{
p
.unblock
()
}
#
debug
(
"%?, %?"
,
ready_packet
,
pkts
[
ready_packet
]);
assert
pkts
[
ready_packet
]
.state
==
full
||
pkts
[
ready_packet
]
.state
==
terminated
;
ready_packet
}
#[doc
=
"Waits on a set of endpoints. Returns a message, its index,
and a list of the remaining endpoints."
]
fn
select
<
T
:
send
>
(
+
endpoints
:
~
[
recv_packet
<
T
>
])
->
(
uint
,
option
<
T
>
,
~
[
recv_packet
<
T
>
])
{
let
endpoints
=
vec
::
map_consume
(
endpoints
,
|
p
|
unsafe
{
uniquify
(
p
.unwrap
())
});
let
endpoints_r
=
vec
::
view
(
endpoints
,
0
,
endpoints
.len
());
let
ready
=
wait_many
(
endpoints_r
.map_r
(|
p
|
&
p
.header
));
let
mut
remaining
=
~
[];
let
mut
result
=
none
;
do
vec
::
consume
(
endpoints
)
|
i
,
p
|
{
let
p
=
recv_packet
(
unsafe
{
unsafe
::
transmute
(
p
)
});
if
i
==
ready
{
result
=
recv
(
p
);
}
else
{
vec
::
push
(
remaining
,
p
);
}
}
(
ready
,
result
,
remaining
)
}
class
send_packet
<
T
:
send
>
{
let
mut
p
:
option
<*
packet
<
T
>>
;
new
(
p
:
*
packet
<
T
>
)
{
//#
error
("take send %?", p);
//#
debug
("take send %?", p);
self
.p
=
some
(
p
);
}
drop
{
//if self.p != none {
// #
error
("drop send %?", option::get(self.p));
// #
debug
("drop send %?", option::get(self.p));
//}
if
self
.p
!=
none
{
let
mut
p
=
none
;
...
...
@@ -196,12 +294,12 @@ fn unwrap() -> *packet<T> {
class
recv_packet
<
T
:
send
>
{
let
mut
p
:
option
<*
packet
<
T
>>
;
new
(
p
:
*
packet
<
T
>
)
{
//#
error
("take recv %?", p);
//#
debug
("take recv %?", p);
self
.p
=
some
(
p
);
}
drop
{
//if self.p != none {
// #
error
("drop recv %?", option::get(self.p));
// #
debug
("drop recv %?", option::get(self.p));
//}
if
self
.p
!=
none
{
let
mut
p
=
none
;
...
...
@@ -222,7 +320,7 @@ fn entangle<T: send>() -> (send_packet<T>, recv_packet<T>) {
}
fn
spawn_service
<
T
:
send
>
(
init
:
native
fn
()
->
(
send_packet
<
T
>
,
recv_packet
<
T
>
),
init
:
extern
fn
()
->
(
send_packet
<
T
>
,
recv_packet
<
T
>
),
+
service
:
fn
~
(
+
recv_packet
<
T
>
))
->
send_packet
<
T
>
{
...
...
@@ -241,7 +339,7 @@ fn spawn_service<T: send>(
}
fn
spawn_service_recv
<
T
:
send
>
(
init
:
native
fn
()
->
(
recv_packet
<
T
>
,
send_packet
<
T
>
),
init
:
extern
fn
()
->
(
recv_packet
<
T
>
,
send_packet
<
T
>
),
+
service
:
fn
~
(
+
send_packet
<
T
>
))
->
recv_packet
<
T
>
{
...
...
src/libcore/vec.rs
浏览文件 @
a787f400
...
...
@@ -6,6 +6,7 @@
export
append
;
export
append_one
;
export
consume
;
export
init_op
;
export
is_empty
;
export
is_not_empty
;
...
...
@@ -40,6 +41,7 @@
export
map
;
export
mapi
;
export
map2
;
export
map_consume
;
export
flat_map
;
export
filter_map
;
export
filter
;
...
...
@@ -261,8 +263,8 @@ fn from_mut<T>(+v: ~[mut T]) -> ~[T] {
ret
result
;
}
/// Return a slice that points into another slice.
pure
fn
view
<
T
:
copy
>
(
v
:
&
[
const
T
],
start
:
uint
,
end
:
uint
)
->
&
a
.
[
T
]
{
#[doc
=
"Return a slice that points into another slice."
]
pure
fn
view
<
T
>
(
v
:
&
[
const
T
],
start
:
uint
,
end
:
uint
)
->
&
a
.
[
T
]
{
assert
(
start
<=
end
);
assert
(
end
<=
len
(
v
));
do
unpack_slice
(
v
)
|
p
,
_
len
|
{
...
...
@@ -373,7 +375,7 @@ fn rsplitn<T: copy>(v: &[T], n: uint, f: fn(T) -> bool) -> ~[~[T]] {
/// Removes the first element from a vector and return it
fn
shift
<
T
>
(
&
v
:
~
[
T
])
->
T
{
let
ln
=
len
::
<
T
>
(
v
);
assert
(
ln
>
0
u
);
assert
(
ln
>
0
);
let
mut
vv
=
~
[];
v
<->
vv
;
...
...
@@ -384,12 +386,12 @@ fn shift<T>(&v: ~[T]) -> T {
let
vv
=
unsafe
::
to_ptr
(
vv
);
rr
<-
*
vv
;
for
uint
::
range
(
1
u
,
ln
)
|
i
|
{
for
uint
::
range
(
1
,
ln
)
|
i
|
{
let
r
<-
*
ptr
::
offset
(
vv
,
i
);
push
(
v
,
r
);
}
}
unsafe
::
set_len
(
vv
,
0
u
);
unsafe
::
set_len
(
vv
,
0
);
rr
}
...
...
@@ -404,6 +406,17 @@ fn unshift<T>(&v: ~[T], +x: T) {
}
}
fn
consume
<
T
>
(
+
v
:
~
[
T
],
f
:
fn
(
uint
,
+
T
))
unsafe
{
do
unpack_slice
(
v
)
|
p
,
ln
|
{
for
uint
::
range
(
0
,
ln
)
|
i
|
{
let
x
<-
*
ptr
::
offset
(
p
,
i
);
f
(
i
,
x
);
}
}
unsafe
::
set_len
(
v
,
0
);
}
/// Remove the last element from a vector and return it
fn
pop
<
T
>
(
&
v
:
~
[
const
T
])
->
T
{
let
ln
=
len
(
v
);
...
...
@@ -575,6 +588,14 @@ fn grow_set<T: copy>(&v: ~[mut T], index: uint, initval: T, val: T) {
ret
result
;
}
fn
map_consume
<
T
,
U
>
(
+
v
:
~
[
T
],
f
:
fn
(
+
T
)
->
U
)
->
~
[
U
]
{
let
mut
result
=
~
[];
do
consume
(
v
)
|
_
i
,
x
|
{
vec
::
push
(
result
,
f
(
x
));
}
result
}
/// Apply a function to each element of a vector and return the results
pure
fn
mapi
<
T
,
U
>
(
v
:
&
[
T
],
f
:
fn
(
uint
,
T
)
->
U
)
->
~
[
U
]
{
let
mut
result
=
~
[];
...
...
@@ -1277,6 +1298,18 @@ impl extensions/&<T> for &[T] {
pure
fn
mapi
<
U
>
(
f
:
fn
(
uint
,
T
)
->
U
)
->
~
[
U
]
{
mapi
(
self
,
f
)
}
#[inline]
fn
map_r
<
U
>
(
f
:
fn
(
x
:
&
self
.T
)
->
U
)
->
~
[
U
]
{
let
mut
r
=
~
[];
let
mut
i
=
0
;
while
i
<
self
.len
()
{
push
(
r
,
f
(
&
self
[
i
]));
i
+=
1
;
}
r
}
/**
* Returns true if the function returns true for all elements.
*
...
...
src/test/bench/msgsend-ring-contracts.rs
浏览文件 @
a787f400
...
...
@@ -119,7 +119,7 @@ fn main(args: [str]/~) {
thread_ring
(
0u
,
msg_per_task
,
option
::
unwrap
(
num_chan
),
num_port
);
// synchronize
for
futures
.each
|
f
|
{
f
.get
(
)
};
for
futures
.each
|
f
|
{
f
uture
::
get
(
f
)
};
let
stop
=
time
::
precise_time_s
();
...
...
src/test/run-pass/pipe-manual-2.rs
浏览文件 @
a787f400
...
...
@@ -13,184 +13,6 @@
*/
// Hopefully someday we'll move this into core.
mod
pipes
{
import
unsafe
::{
forget
,
reinterpret_cast
};
enum
state
{
empty
,
full
,
blocked
,
terminated
}
type
packet
<
T
:
send
>
=
{
mut
state
:
state
,
mut
blocked_task
:
option
<
task
::
task
>
,
mut
payload
:
option
<
T
>
};
fn
packet
<
T
:
send
>
()
->
*
packet
<
T
>
unsafe
{
let
p
:
*
packet
<
T
>
=
unsafe
::
transmute
(
~
{
mut
state
:
empty
,
mut
blocked_task
:
none
::
<
task
::
task
>
,
mut
payload
:
none
::
<
T
>
});
p
}
#[abi
=
"rust-intrinsic"
]
native
mod
rusti
{
fn
atomic_xchng
(
&
dst
:
int
,
src
:
int
)
->
int
;
fn
atomic_xchng_acq
(
&
dst
:
int
,
src
:
int
)
->
int
;
fn
atomic_xchng_rel
(
&
dst
:
int
,
src
:
int
)
->
int
;
}
// We should consider moving this to core::unsafe, although I
// suspect graydon would want us to use void pointers instead.
unsafe
fn
uniquify
<
T
>
(
x
:
*
T
)
->
~
T
{
unsafe
{
unsafe
::
reinterpret_cast
(
x
)
}
}
fn
swap_state_acq
(
&
dst
:
state
,
src
:
state
)
->
state
{
unsafe
{
reinterpret_cast
(
rusti
::
atomic_xchng_acq
(
*
(
ptr
::
mut_addr_of
(
dst
)
as
*
mut
int
),
src
as
int
))
}
}
fn
swap_state_rel
(
&
dst
:
state
,
src
:
state
)
->
state
{
unsafe
{
reinterpret_cast
(
rusti
::
atomic_xchng_rel
(
*
(
ptr
::
mut_addr_of
(
dst
)
as
*
mut
int
),
src
as
int
))
}
}
fn
send
<
T
:
send
>
(
-
p
:
send_packet
<
T
>
,
-
payload
:
T
)
{
let
p
=
p
.unwrap
();
let
p
=
unsafe
{
uniquify
(
p
)
};
assert
(
*
p
)
.payload
==
none
;
(
*
p
)
.payload
<-
some
(
payload
);
let
old_state
=
swap_state_rel
((
*
p
)
.state
,
full
);
alt
old_state
{
empty
{
// Yay, fastpath.
// The receiver will eventually clean this up.
unsafe
{
forget
(
p
);
}
}
full
{
fail
"duplicate send"
}
blocked
{
// FIXME: once the target will actually block, tell the
// scheduler to wake it up.
// The receiver will eventually clean this up.
unsafe
{
forget
(
p
);
}
}
terminated
{
// The receiver will never receive this. Rely on drop_glue
// to clean everything up.
}
}
}
fn
recv
<
T
:
send
>
(
-
p
:
recv_packet
<
T
>
)
->
option
<
T
>
{
let
p
=
p
.unwrap
();
let
p
=
unsafe
{
uniquify
(
p
)
};
loop
{
let
old_state
=
swap_state_acq
((
*
p
)
.state
,
blocked
);
alt
old_state
{
empty
|
blocked
{
task
::
yield
();
}
full
{
let
mut
payload
=
none
;
payload
<->
(
*
p
)
.payload
;
ret
some
(
option
::
unwrap
(
payload
))
}
terminated
{
assert
old_state
==
terminated
;
ret
none
;
}
}
}
}
fn
sender_terminate
<
T
:
send
>
(
p
:
*
packet
<
T
>
)
{
let
p
=
unsafe
{
uniquify
(
p
)
};
alt
swap_state_rel
((
*
p
)
.state
,
terminated
)
{
empty
|
blocked
{
// The receiver will eventually clean up.
unsafe
{
forget
(
p
)
}
}
full
{
// This is impossible
fail
"you dun goofed"
}
terminated
{
// I have to clean up, use drop_glue
}
}
}
fn
receiver_terminate
<
T
:
send
>
(
p
:
*
packet
<
T
>
)
{
let
p
=
unsafe
{
uniquify
(
p
)
};
alt
swap_state_rel
((
*
p
)
.state
,
terminated
)
{
empty
{
// the sender will clean up
unsafe
{
forget
(
p
)
}
}
blocked
{
// this shouldn't happen.
fail
"terminating a blocked packet"
}
terminated
|
full
{
// I have to clean up, use drop_glue
}
}
}
class
send_packet
<
T
:
send
>
{
let
mut
p
:
option
<*
packet
<
T
>>
;
new
(
p
:
*
packet
<
T
>
)
{
self
.p
=
some
(
p
);
}
drop
{
if
self
.p
!=
none
{
let
mut
p
=
none
;
p
<->
self
.p
;
sender_terminate
(
option
::
unwrap
(
p
))
}
}
fn
unwrap
()
->
*
packet
<
T
>
{
let
mut
p
=
none
;
p
<->
self
.p
;
option
::
unwrap
(
p
)
}
}
class
recv_packet
<
T
:
send
>
{
let
mut
p
:
option
<*
packet
<
T
>>
;
new
(
p
:
*
packet
<
T
>
)
{
self
.p
=
some
(
p
);
}
drop
{
if
self
.p
!=
none
{
let
mut
p
=
none
;
p
<->
self
.p
;
receiver_terminate
(
option
::
unwrap
(
p
))
}
}
fn
unwrap
()
->
*
packet
<
T
>
{
let
mut
p
=
none
;
p
<->
self
.p
;
option
::
unwrap
(
p
)
}
}
fn
entangle
<
T
:
send
>
()
->
(
send_packet
<
T
>
,
recv_packet
<
T
>
)
{
let
p
=
packet
();
(
send_packet
(
p
),
recv_packet
(
p
))
}
}
mod
pingpong
{
enum
ping
=
*
pipes
::
packet
<
pong
>
;
enum
pong
=
*
pipes
::
packet
<
ping
>
;
...
...
src/test/run-pass/pipe-manual-3.rs
浏览文件 @
a787f400
...
...
@@ -15,184 +15,6 @@
*/
// Hopefully someday we'll move this into core.
mod
pipes
{
import
unsafe
::{
forget
,
reinterpret_cast
};
enum
state
{
empty
,
full
,
blocked
,
terminated
}
type
packet
<
T
:
send
>
=
{
mut
state
:
state
,
mut
blocked_task
:
option
<
task
::
task
>
,
mut
payload
:
option
<
T
>
};
fn
packet
<
T
:
send
>
()
->
*
packet
<
T
>
unsafe
{
let
p
:
*
packet
<
T
>
=
unsafe
::
transmute
(
~
{
mut
state
:
empty
,
mut
blocked_task
:
none
::
<
task
::
task
>
,
mut
payload
:
none
::
<
T
>
});
p
}
#[abi
=
"rust-intrinsic"
]
native
mod
rusti
{
fn
atomic_xchng
(
&
dst
:
int
,
src
:
int
)
->
int
;
fn
atomic_xchng_acq
(
&
dst
:
int
,
src
:
int
)
->
int
;
fn
atomic_xchng_rel
(
&
dst
:
int
,
src
:
int
)
->
int
;
}
// We should consider moving this to core::unsafe, although I
// suspect graydon would want us to use void pointers instead.
unsafe
fn
uniquify
<
T
>
(
x
:
*
T
)
->
~
T
{
unsafe
{
unsafe
::
reinterpret_cast
(
x
)
}
}
fn
swap_state_acq
(
&
dst
:
state
,
src
:
state
)
->
state
{
unsafe
{
reinterpret_cast
(
rusti
::
atomic_xchng_acq
(
*
(
ptr
::
mut_addr_of
(
dst
)
as
*
mut
int
),
src
as
int
))
}
}
fn
swap_state_rel
(
&
dst
:
state
,
src
:
state
)
->
state
{
unsafe
{
reinterpret_cast
(
rusti
::
atomic_xchng_rel
(
*
(
ptr
::
mut_addr_of
(
dst
)
as
*
mut
int
),
src
as
int
))
}
}
fn
send
<
T
:
send
>
(
-
p
:
send_packet
<
T
>
,
-
payload
:
T
)
{
let
p
=
p
.unwrap
();
let
p
=
unsafe
{
uniquify
(
p
)
};
assert
(
*
p
)
.payload
==
none
;
(
*
p
)
.payload
<-
some
(
payload
);
let
old_state
=
swap_state_rel
((
*
p
)
.state
,
full
);
alt
old_state
{
empty
{
// Yay, fastpath.
// The receiver will eventually clean this up.
unsafe
{
forget
(
p
);
}
}
full
{
fail
"duplicate send"
}
blocked
{
// FIXME: once the target will actually block, tell the
// scheduler to wake it up.
// The receiver will eventually clean this up.
unsafe
{
forget
(
p
);
}
}
terminated
{
// The receiver will never receive this. Rely on drop_glue
// to clean everything up.
}
}
}
fn
recv
<
T
:
send
>
(
-
p
:
recv_packet
<
T
>
)
->
option
<
T
>
{
let
p
=
p
.unwrap
();
let
p
=
unsafe
{
uniquify
(
p
)
};
loop
{
let
old_state
=
swap_state_acq
((
*
p
)
.state
,
blocked
);
alt
old_state
{
empty
|
blocked
{
task
::
yield
();
}
full
{
let
mut
payload
=
none
;
payload
<->
(
*
p
)
.payload
;
ret
some
(
option
::
unwrap
(
payload
))
}
terminated
{
assert
old_state
==
terminated
;
ret
none
;
}
}
}
}
fn
sender_terminate
<
T
:
send
>
(
p
:
*
packet
<
T
>
)
{
let
p
=
unsafe
{
uniquify
(
p
)
};
alt
swap_state_rel
((
*
p
)
.state
,
terminated
)
{
empty
|
blocked
{
// The receiver will eventually clean up.
unsafe
{
forget
(
p
)
}
}
full
{
// This is impossible
fail
"you dun goofed"
}
terminated
{
// I have to clean up, use drop_glue
}
}
}
fn
receiver_terminate
<
T
:
send
>
(
p
:
*
packet
<
T
>
)
{
let
p
=
unsafe
{
uniquify
(
p
)
};
alt
swap_state_rel
((
*
p
)
.state
,
terminated
)
{
empty
{
// the sender will clean up
unsafe
{
forget
(
p
)
}
}
blocked
{
// this shouldn't happen.
fail
"terminating a blocked packet"
}
terminated
|
full
{
// I have to clean up, use drop_glue
}
}
}
class
send_packet
<
T
:
send
>
{
let
mut
p
:
option
<*
packet
<
T
>>
;
new
(
p
:
*
packet
<
T
>
)
{
self
.p
=
some
(
p
);
}
drop
{
if
self
.p
!=
none
{
let
mut
p
=
none
;
p
<->
self
.p
;
sender_terminate
(
option
::
unwrap
(
p
))
}
}
fn
unwrap
()
->
*
packet
<
T
>
{
let
mut
p
=
none
;
p
<->
self
.p
;
option
::
unwrap
(
p
)
}
}
class
recv_packet
<
T
:
send
>
{
let
mut
p
:
option
<*
packet
<
T
>>
;
new
(
p
:
*
packet
<
T
>
)
{
self
.p
=
some
(
p
);
}
drop
{
if
self
.p
!=
none
{
let
mut
p
=
none
;
p
<->
self
.p
;
receiver_terminate
(
option
::
unwrap
(
p
))
}
}
fn
unwrap
()
->
*
packet
<
T
>
{
let
mut
p
=
none
;
p
<->
self
.p
;
option
::
unwrap
(
p
)
}
}
fn
entangle
<
T
:
send
>
()
->
(
send_packet
<
T
>
,
recv_packet
<
T
>
)
{
let
p
=
packet
();
(
send_packet
(
p
),
recv_packet
(
p
))
}
}
mod
pingpong
{
enum
ping
{
ping
,
}
enum
ping_message
=
*
pipes
::
packet
<
pong_message
>
;
...
...
src/test/run-pass/pipe-select.rs
0 → 100644
浏览文件 @
a787f400
use
std
;
import
std
::
timer
::
sleep
;
import
std
::
uv
;
import
pipes
::{
recv
,
select
};
// Compiled by pipec
mod
oneshot
{
fn
init
()
->
(
client
::
waiting
,
server
::
waiting
)
{
pipes
::
entangle
()
}
enum
waiting
{
signal
(
server
::
signaled
),
}
enum
signaled
{
}
mod
client
{
fn
signal
(
-
pipe
:
waiting
)
->
signaled
{
let
(
c
,
s
)
=
pipes
::
entangle
();
let
message
=
oneshot
::
signal
(
s
);
pipes
::
send
(
pipe
,
message
);
c
}
type
waiting
=
pipes
::
send_packet
<
oneshot
::
waiting
>
;
type
signaled
=
pipes
::
send_packet
<
oneshot
::
signaled
>
;
}
mod
server
{
impl
recv
for
waiting
{
fn
recv
()
->
extern
fn
(
-
waiting
)
->
oneshot
::
waiting
{
fn
recv
(
-
pipe
:
waiting
)
->
oneshot
::
waiting
{
option
::
unwrap
(
pipes
::
recv
(
pipe
))
}
recv
}
}
type
waiting
=
pipes
::
recv_packet
<
oneshot
::
waiting
>
;
impl
recv
for
signaled
{
fn
recv
()
->
extern
fn
(
-
signaled
)
->
oneshot
::
signaled
{
fn
recv
(
-
pipe
:
signaled
)
->
oneshot
::
signaled
{
option
::
unwrap
(
pipes
::
recv
(
pipe
))
}
recv
}
}
type
signaled
=
pipes
::
recv_packet
<
oneshot
::
signaled
>
;
}
}
mod
stream
{
fn
init
<
T
:
send
>
()
->
(
client
::
stream
<
T
>
,
server
::
stream
<
T
>
)
{
pipes
::
entangle
()
}
enum
stream
<
T
:
send
>
{
send
(
T
,
server
::
stream
<
T
>
),
}
mod
client
{
fn
send
<
T
:
send
>
(
+
pipe
:
stream
<
T
>
,
+
x_0
:
T
)
->
stream
<
T
>
{
{
let
(
c
,
s
)
=
pipes
::
entangle
();
let
message
=
stream
::
send
(
x_0
,
s
);
pipes
::
send
(
pipe
,
message
);
c
}
}
type
stream
<
T
:
send
>
=
pipes
::
send_packet
<
stream
::
stream
<
T
>>
;
}
mod
server
{
impl
recv
<
T
:
send
>
for
stream
<
T
>
{
fn
recv
()
->
extern
fn
(
+
stream
<
T
>
)
->
stream
::
stream
<
T
>
{
fn
recv
<
T
:
send
>
(
+
pipe
:
stream
<
T
>
)
->
stream
::
stream
<
T
>
{
option
::
unwrap
(
pipes
::
recv
(
pipe
))
}
recv
}
}
type
stream
<
T
:
send
>
=
pipes
::
recv_packet
<
stream
::
stream
<
T
>>
;
}
}
fn
main
()
{
import
oneshot
::
client
::
*
;
import
stream
::
client
::
*
;
let
iotask
=
uv
::
global_loop
::
get
();
#
macro
[
[
#
recv
[
chan
],
chan
.recv
()(
chan
)]
];
let
c
=
pipes
::
spawn_service
(
stream
::
init
,
|
p
|
{
#
error
(
"waiting for pipes"
);
let
stream
::
send
(
x
,
p
)
=
option
::
unwrap
(
recv
(
p
));
#
error
(
"got pipes"
);
let
(
left
,
right
)
:
(
oneshot
::
server
::
waiting
,
oneshot
::
server
::
waiting
)
=
x
;
#
error
(
"selecting"
);
let
(
i
,
_
,
_
)
=
select
(
~
[
left
,
right
]);
#
error
(
"selected"
);
assert
i
==
0
;
#
error
(
"waiting for pipes"
);
let
stream
::
send
(
x
,
_
)
=
option
::
unwrap
(
recv
(
p
));
#
error
(
"got pipes"
);
let
(
left
,
right
)
:
(
oneshot
::
server
::
waiting
,
oneshot
::
server
::
waiting
)
=
x
;
#
error
(
"selecting"
);
let
(
i
,
_
,
_
)
=
select
(
~
[
left
,
right
]);
#
error
(
"selected"
);
assert
i
==
1
;
});
let
(
c1
,
p1
)
=
oneshot
::
init
();
let
(
c2
,
p2
)
=
oneshot
::
init
();
let
c
=
send
(
c
,
(
p1
,
p2
));
sleep
(
iotask
,
1000
);
signal
(
c1
);
let
(
c1
,
p1
)
=
oneshot
::
init
();
let
(
c2
,
p2
)
=
oneshot
::
init
();
send
(
c
,
(
p1
,
p2
));
sleep
(
iotask
,
1000
);
signal
(
c2
);
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录