提交 93ca5ebc 编写于 作者: B Brian Anderson

core::rt: Clean up the interface to rtio

Make names that better match rt::io. Return error types.
上级 b2fbd346
......@@ -238,6 +238,7 @@
* How does I/O relate to the Iterator trait?
* std::base64 filters
* Using conditions is a big unknown since we don't have much experience with them
* Too many uses of OtherIoError
*/
......
......@@ -9,21 +9,22 @@
// except according to those terms.
use option::{Option, Some, None};
use result::{Result, Ok, Err};
use result::{Ok, Err};
use ops::Drop;
use rt::sched::local_sched::unsafe_borrow_io;
use rt::io::net::ip::IpAddr;
use rt::io::{Reader, Writer, Listener};
use rt::io::io_error;
use rt::rtio;
use rt::rtio::{IoFactory, TcpListener, Stream};
use rt::rtio::{IoFactory,
RtioTcpListener, RtioTcpListenerObject,
RtioTcpStream, RtioTcpStreamObject};
pub struct TcpStream {
rtstream: ~rtio::StreamObject
rtstream: ~RtioTcpStreamObject
}
impl TcpStream {
fn new(s: ~rtio::StreamObject) -> TcpStream {
fn new(s: ~RtioTcpStreamObject) -> TcpStream {
TcpStream {
rtstream: s
}
......@@ -34,7 +35,7 @@ pub fn connect(addr: IpAddr) -> Option<TcpStream> {
rtdebug!("borrowing io to connect");
let io = unsafe_borrow_io();
rtdebug!("about to connect");
io.connect(addr)
io.tcp_connect(addr)
};
match stream {
......@@ -85,12 +86,12 @@ fn finalize(&self) {
}
pub struct TcpListener {
rtlistener: ~rtio::TcpListenerObject
rtlistener: ~RtioTcpListenerObject
}
impl TcpListener {
pub fn bind(addr: IpAddr) -> Option<TcpListener> {
let listener = unsafe { unsafe_borrow_io().bind(addr) };
let listener = unsafe { unsafe_borrow_io().tcp_bind(addr) };
match listener {
Ok(l) => {
Some(TcpListener {
......@@ -107,12 +108,12 @@ pub fn bind(addr: IpAddr) -> Option<TcpListener> {
impl Listener<TcpStream> for TcpListener {
fn accept(&mut self) -> Option<TcpStream> {
let rtstream = self.rtlistener.listen();
let rtstream = self.rtlistener.accept();
match rtstream {
Some(s) => {
Ok(s) => {
Some(TcpStream::new(s))
}
None => {
Err(_) => {
abort!("TODO");
}
}
......
......@@ -18,8 +18,8 @@
// types to use instead
pub type EventLoopObject = super::uvio::UvEventLoop;
pub type IoFactoryObject = super::uvio::UvIoFactory;
pub type StreamObject = super::uvio::UvStream;
pub type TcpListenerObject = super::uvio::UvTcpListener;
pub type RtioTcpStreamObject = super::uvio::UvTcpStream;
pub type RtioTcpListenerObject = super::uvio::UvTcpListener;
pub trait EventLoop {
fn run(&mut self);
......@@ -29,15 +29,15 @@ pub trait EventLoop {
}
pub trait IoFactory {
fn connect(&mut self, addr: IpAddr) -> Result<~StreamObject, IoError>;
fn bind(&mut self, addr: IpAddr) -> Result<~TcpListenerObject, IoError>;
fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>;
fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>;
}
pub trait TcpListener {
fn listen(&mut self) -> Option<~StreamObject>;
pub trait RtioTcpListener {
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>;
}
pub trait Stream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, ()>;
fn write(&mut self, buf: &[u8]) -> Result<(), ()>;
pub trait RtioTcpStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>;
fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
}
......@@ -48,7 +48,7 @@
use unstable::finally::Finally;
use rt::uvll;
use rt::io::{IoError, FileNotFound};
use rt::io::IoError;
#[cfg(test)] use unstable::run_in_bare_thread;
......
......@@ -10,19 +10,20 @@
use option::*;
use result::*;
use rt::io::IoError;
use super::io::net::ip::IpAddr;
use super::uv::*;
use super::rtio::*;
use ops::Drop;
use cell::{Cell, empty_cell};
use cast::transmute;
use super::sched::{Scheduler, local_sched};
use rt::io::IoError;
use rt::io::net::ip::IpAddr;
use rt::uv::*;
use rt::rtio::*;
use rt::sched::{Scheduler, local_sched};
use rt::io::{standard_error, OtherIoError};
#[cfg(test)] use uint;
#[cfg(test)] use unstable::run_in_bare_thread;
#[cfg(test)] use super::test::*;
#[cfg(test)] use rt::test::*;
pub struct UvEventLoop {
uvio: UvIoFactory
......@@ -99,11 +100,11 @@ impl IoFactory for UvIoFactory {
// Connect to an address and return a new stream
// NB: This blocks the task waiting on the connection.
// It would probably be better to return a future
fn connect(&mut self, addr: IpAddr) -> Result<~StreamObject, IoError> {
fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError> {
// Create a cell in the task to hold the result. We will fill
// the cell before resuming the task.
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Result<~StreamObject, IoError>> = &result_cell;
let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
let scheduler = local_sched::take();
assert!(scheduler.in_task_context());
......@@ -123,7 +124,7 @@ fn connect(&mut self, addr: IpAddr) -> Result<~StreamObject, IoError> {
rtdebug!("connect: in connect callback");
let maybe_stream = if status.is_none() {
rtdebug!("status is none");
Ok(~UvStream(stream_watcher))
Ok(~UvTcpStream(stream_watcher))
} else {
rtdebug!("status is some");
// XXX: Wait for close
......@@ -144,7 +145,7 @@ fn connect(&mut self, addr: IpAddr) -> Result<~StreamObject, IoError> {
return result_cell.take();
}
fn bind(&mut self, addr: IpAddr) -> Result<~TcpListenerObject, IoError> {
fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError> {
let mut watcher = TcpWatcher::new(self.uv_loop());
match watcher.bind(addr) {
Ok(_) => Ok(~UvTcpListener(watcher)),
......@@ -177,12 +178,12 @@ fn finalize(&self) {
}
}
impl TcpListener for UvTcpListener {
impl RtioTcpListener for UvTcpListener {
fn listen(&mut self) -> Option<~StreamObject> {
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
rtdebug!("entering listen");
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
let server_tcp_watcher = self.watcher();
......@@ -199,9 +200,9 @@ fn listen(&mut self) -> Option<~StreamObject> {
let client_tcp_watcher = TcpWatcher::new(&mut loop_).as_stream();
// XXX: Needs to be surfaced in interface
server_stream_watcher.accept(client_tcp_watcher);
Some(~UvStream::new(client_tcp_watcher))
Ok(~UvTcpStream::new(client_tcp_watcher))
} else {
None
Err(standard_error(OtherIoError))
};
unsafe { (*result_cell_ptr).put_back(maybe_stream); }
......@@ -218,15 +219,15 @@ fn listen(&mut self) -> Option<~StreamObject> {
}
}
pub struct UvStream(StreamWatcher);
pub struct UvTcpStream(StreamWatcher);
impl UvStream {
fn new(watcher: StreamWatcher) -> UvStream {
UvStream(watcher)
impl UvTcpStream {
fn new(watcher: StreamWatcher) -> UvTcpStream {
UvTcpStream(watcher)
}
fn watcher(&self) -> StreamWatcher {
match self { &UvStream(w) => w }
match self { &UvTcpStream(w) => w }
}
// XXX: finalize isn't working for ~UvStream???
......@@ -236,17 +237,17 @@ fn close(&self) {
}
}
impl Drop for UvStream {
impl Drop for UvTcpStream {
fn finalize(&self) {
rtdebug!("closing stream");
//self.watcher().close(||());
}
}
impl Stream for UvStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, ()> {
impl RtioTcpStream for UvTcpStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Result<uint, ()>> = &result_cell;
let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
let scheduler = local_sched::take();
assert!(scheduler.in_task_context());
......@@ -277,7 +278,7 @@ fn read(&mut self, buf: &mut [u8]) -> Result<uint, ()> {
assert!(nread >= 0);
Ok(nread as uint)
} else {
Err(())
Err(standard_error(OtherIoError))
};
unsafe { (*result_cell_ptr).put_back(result); }
......@@ -291,9 +292,9 @@ fn read(&mut self, buf: &mut [u8]) -> Result<uint, ()> {
return result_cell.take();
}
fn write(&mut self, buf: &[u8]) -> Result<(), ()> {
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Result<(), ()>> = &result_cell;
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let scheduler = local_sched::take();
assert!(scheduler.in_task_context());
let watcher = self.watcher();
......@@ -308,7 +309,7 @@ fn write(&mut self, buf: &[u8]) -> Result<(), ()> {
let result = if status.is_none() {
Ok(())
} else {
Err(())
Err(standard_error(OtherIoError))
};
unsafe { (*result_cell_ptr).put_back(result); }
......@@ -328,7 +329,7 @@ fn test_simple_io_no_connect() {
do run_in_newsched_task {
let io = unsafe { local_sched::unsafe_borrow_io() };
let addr = next_test_ip4();
let maybe_chan = io.connect(addr);
let maybe_chan = io.tcp_connect(addr);
assert!(maybe_chan.is_err());
}
}
......@@ -342,8 +343,8 @@ fn test_simple_tcp_server_and_client() {
do spawntask_immediately {
unsafe {
let io = local_sched::unsafe_borrow_io();
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
let mut listener = io.tcp_bind(addr).unwrap();
let mut stream = listener.accept().unwrap();
let mut buf = [0, .. 2048];
let nread = stream.read(buf).unwrap();
assert!(nread == 8);
......@@ -359,7 +360,7 @@ fn test_simple_tcp_server_and_client() {
do spawntask_immediately {
unsafe {
let io = local_sched::unsafe_borrow_io();
let mut stream = io.connect(addr).unwrap();
let mut stream = io.tcp_connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.close();
}
......@@ -374,8 +375,8 @@ fn test_read_and_block() {
do spawntask_immediately {
let io = unsafe { local_sched::unsafe_borrow_io() };
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
let mut listener = io.tcp_bind(addr).unwrap();
let mut stream = listener.accept().unwrap();
let mut buf = [0, .. 2048];
let expected = 32;
......@@ -412,7 +413,7 @@ fn test_read_and_block() {
do spawntask_immediately {
let io = unsafe { local_sched::unsafe_borrow_io() };
let mut stream = io.connect(addr).unwrap();
let mut stream = io.tcp_connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
......@@ -432,8 +433,8 @@ fn test_read_read_read() {
do spawntask_immediately {
unsafe {
let io = local_sched::unsafe_borrow_io();
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
let mut listener = io.tcp_bind(addr).unwrap();
let mut stream = listener.accept().unwrap();
let mut buf = [1, .. 2048];
let mut total_bytes_written = 0;
while total_bytes_written < MAX {
......@@ -447,7 +448,7 @@ fn test_read_read_read() {
do spawntask_immediately {
let io = unsafe { local_sched::unsafe_borrow_io() };
let mut stream = io.connect(addr).unwrap();
let mut stream = io.tcp_connect(addr).unwrap();
let mut buf = [0, .. 2048];
let mut total_bytes_read = 0;
while total_bytes_read < MAX {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册