net.rs 41.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

11 12
use libc::{size_t, ssize_t, c_int, c_void, c_uint};
use libc;
13
use std::io;
14
use std::io::IoError;
15 16
use std::io::net::ip;
use std::mem;
17
use std::ptr;
18
use std::rt::rtio;
19
use std::rt::task::BlockedTask;
20

21
use homing::{HomingIO, HomeHandle};
22
use rc::Refcount;
23 24
use stream::StreamWatcher;
use super::{Loop, Request, UvError, Buf, status_to_io_result,
25
            uv_error_to_io_error, UvHandle, slice_to_uv_buf,
26
            wait_until_woken_after, wakeup};
27
use timeout::{AccessTimeout, AcceptTimeout, ConnectCtx};
28
use uvio::UvIoFactory;
29
use uvll;
30

31 32 33
////////////////////////////////////////////////////////////////////////////////
/// Generic functions related to dealing with sockaddr things
////////////////////////////////////////////////////////////////////////////////
34

35 36
pub fn htons(u: u16) -> u16 { mem::to_be16(u) }
pub fn ntohs(u: u16) -> u16 { mem::from_be16(u) }
37

38 39 40 41 42 43
pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
                        len: uint) -> ip::SocketAddr {
    match storage.ss_family as c_int {
        libc::AF_INET => {
            assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
            let storage: &libc::sockaddr_in = unsafe {
A
Alex Crichton 已提交
44
                mem::transmute(storage)
45
            };
46 47 48 49 50 51 52 53
            let addr = storage.sin_addr.s_addr as u32;
            let a = (addr >>  0) as u8;
            let b = (addr >>  8) as u8;
            let c = (addr >> 16) as u8;
            let d = (addr >> 24) as u8;
            ip::SocketAddr {
                ip: ip::Ipv4Addr(a, b, c, d),
                port: ntohs(storage.sin_port),
54
            }
55 56 57 58
        }
        libc::AF_INET6 => {
            assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
            let storage: &libc::sockaddr_in6 = unsafe {
A
Alex Crichton 已提交
59
                mem::transmute(storage)
A
Alex Crichton 已提交
60
            };
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
            let a = ntohs(storage.sin6_addr.s6_addr[0]);
            let b = ntohs(storage.sin6_addr.s6_addr[1]);
            let c = ntohs(storage.sin6_addr.s6_addr[2]);
            let d = ntohs(storage.sin6_addr.s6_addr[3]);
            let e = ntohs(storage.sin6_addr.s6_addr[4]);
            let f = ntohs(storage.sin6_addr.s6_addr[5]);
            let g = ntohs(storage.sin6_addr.s6_addr[6]);
            let h = ntohs(storage.sin6_addr.s6_addr[7]);
            ip::SocketAddr {
                ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h),
                port: ntohs(storage.sin6_port),
            }
        }
        n => {
            fail!("unknown family {}", n);
        }
A
Alex Crichton 已提交
77
    }
78 79
}

80 81
fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
    unsafe {
82
        let mut storage: libc::sockaddr_storage = mem::init();
83 84 85
        let len = match addr.ip {
            ip::Ipv4Addr(a, b, c, d) => {
                let storage: &mut libc::sockaddr_in =
A
Alex Crichton 已提交
86
                    mem::transmute(&mut storage);
87 88 89 90 91 92 93 94 95 96 97 98
                (*storage).sin_family = libc::AF_INET as libc::sa_family_t;
                (*storage).sin_port = htons(addr.port);
                (*storage).sin_addr = libc::in_addr {
                    s_addr: (d as u32 << 24) |
                            (c as u32 << 16) |
                            (b as u32 <<  8) |
                            (a as u32 <<  0)
                };
                mem::size_of::<libc::sockaddr_in>()
            }
            ip::Ipv6Addr(a, b, c, d, e, f, g, h) => {
                let storage: &mut libc::sockaddr_in6 =
A
Alex Crichton 已提交
99
                    mem::transmute(&mut storage);
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
                storage.sin6_family = libc::AF_INET6 as libc::sa_family_t;
                storage.sin6_port = htons(addr.port);
                storage.sin6_addr = libc::in6_addr {
                    s6_addr: [
                        htons(a),
                        htons(b),
                        htons(c),
                        htons(d),
                        htons(e),
                        htons(f),
                        htons(g),
                        htons(h),
                    ]
                };
                mem::size_of::<libc::sockaddr_in6>()
            }
        };
        return (storage, len);
    }
119 120
}

121 122 123 124 125
enum SocketNameKind {
    TcpPeer,
    Tcp,
    Udp
}
126

127 128 129 130 131 132 133
fn socket_name(sk: SocketNameKind,
               handle: *c_void) -> Result<ip::SocketAddr, IoError> {
    let getsockname = match sk {
        TcpPeer => uvll::uv_tcp_getpeername,
        Tcp     => uvll::uv_tcp_getsockname,
        Udp     => uvll::uv_udp_getsockname,
    };
134

135
    // Allocate a sockaddr_storage since we don't know if it's ipv4 or ipv6
136
    let mut sockaddr: libc::sockaddr_storage = unsafe { mem::init() };
137
    let mut namelen = mem::size_of::<libc::sockaddr_storage>() as c_int;
138

139 140 141 142 143 144
    let sockaddr_p = &mut sockaddr as *mut libc::sockaddr_storage;
    match unsafe {
        getsockname(handle, sockaddr_p as *mut libc::sockaddr, &mut namelen)
    } {
        0 => Ok(sockaddr_to_addr(&sockaddr, namelen as uint)),
        n => Err(uv_error_to_io_error(UvError(n)))
145
    }
146
}
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161

////////////////////////////////////////////////////////////////////////////////
/// TCP implementation
////////////////////////////////////////////////////////////////////////////////

pub struct TcpWatcher {
    handle: *uvll::uv_tcp_t,
    stream: StreamWatcher,
    home: HomeHandle,
    refcount: Refcount,

    // libuv can't support concurrent reads and concurrent writes of the same
    // stream object, so we use these access guards in order to arbitrate among
    // multiple concurrent reads and writes. Note that libuv *can* read and
    // write simultaneously, it just can't read and read simultaneously.
162 163
    read_access: AccessTimeout,
    write_access: AccessTimeout,
164 165 166 167 168 169
}

pub struct TcpListener {
    home: HomeHandle,
    handle: *uvll::uv_pipe_t,
    closing_task: Option<BlockedTask>,
170 171
    outgoing: Sender<Result<Box<rtio::RtioTcpStream:Send>, IoError>>,
    incoming: Receiver<Result<Box<rtio::RtioTcpStream:Send>, IoError>>,
172 173 174
}

pub struct TcpAcceptor {
175
    listener: Box<TcpListener>,
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
    timeout: AcceptTimeout,
}

// TCP watchers (clients/streams)

impl TcpWatcher {
    pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
        let handle = io.make_handle();
        TcpWatcher::new_home(&io.loop_, handle)
    }

    fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
        let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
        assert_eq!(unsafe {
            uvll::uv_tcp_init(loop_.handle, handle)
        }, 0);
        TcpWatcher {
            home: home,
            handle: handle,
            stream: StreamWatcher::new(handle),
            refcount: Refcount::new(),
197 198
            read_access: AccessTimeout::new(),
            write_access: AccessTimeout::new(),
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
        }
    }

    pub fn connect(io: &mut UvIoFactory,
                   address: ip::SocketAddr,
                   timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
        let tcp = TcpWatcher::new(io);
        let cx = ConnectCtx { status: -1, task: None, timer: None };
        let (addr, _len) = addr_to_sockaddr(address);
        let addr_p = &addr as *_ as *libc::sockaddr;
        cx.connect(tcp, timeout, io, |req, tcp, cb| {
            unsafe { uvll::uv_tcp_connect(req.handle, tcp.handle, addr_p, cb) }
        })
    }
}

215
impl HomingIO for TcpWatcher {
216
    fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
217
}
218

219
impl rtio::RtioSocket for TcpWatcher {
220
    fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
221
        let _m = self.fire_homing_missile();
222
        socket_name(Tcp, self.handle)
223 224 225
    }
}

226 227
impl rtio::RtioTcpStream for TcpWatcher {
    fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
228
        let m = self.fire_homing_missile();
229
        let guard = try!(self.read_access.grant(m));
230 231

        // see comments in close_read about this check
232
        if guard.access.is_closed() {
233 234 235
            return Err(io::standard_error(io::EndOfFile))
        }

236
        self.stream.read(buf).map_err(uv_error_to_io_error)
237
    }
238 239

    fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
240
        let m = self.fire_homing_missile();
241 242
        let guard = try!(self.write_access.grant(m));
        self.stream.write(buf, guard.can_timeout).map_err(uv_error_to_io_error)
243 244
    }

245
    fn peer_name(&mut self) -> Result<ip::SocketAddr, IoError> {
246
        let _m = self.fire_homing_missile();
247 248
        socket_name(TcpPeer, self.handle)
    }
249

250
    fn control_congestion(&mut self) -> Result<(), IoError> {
251
        let _m = self.fire_homing_missile();
252 253 254
        status_to_io_result(unsafe {
            uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
        })
255 256
    }

257
    fn nodelay(&mut self) -> Result<(), IoError> {
258
        let _m = self.fire_homing_missile();
259 260 261
        status_to_io_result(unsafe {
            uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
        })
262 263
    }

264
    fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
265
        let _m = self.fire_homing_missile();
266 267 268 269 270
        status_to_io_result(unsafe {
            uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
                                   delay_in_seconds as c_uint)
        })
    }
271

272
    fn letdie(&mut self) -> Result<(), IoError> {
273
        let _m = self.fire_homing_missile();
274 275 276 277
        status_to_io_result(unsafe {
            uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
        })
    }
278

279
    fn clone(&self) -> Box<rtio::RtioTcpStream:Send> {
280
        box TcpWatcher {
281 282 283 284 285
            handle: self.handle,
            stream: StreamWatcher::new(self.handle),
            home: self.home.clone(),
            refcount: self.refcount.clone(),
            read_access: self.read_access.clone(),
286
            write_access: self.write_access.clone(),
287
        } as Box<rtio::RtioTcpStream:Send>
288
    }
289

290 291
    fn close_read(&mut self) -> Result<(), IoError> {
        // see comments in PipeWatcher::close_read
292 293 294 295 296 297
        let task = {
            let m = self.fire_homing_missile();
            self.read_access.access.close(&m);
    self.stream.cancel_read(uvll::EOF as libc::ssize_t)
        };
        let _ = task.map(|t| t.reawaken());
298 299
        Ok(())
    }
300

301 302 303
    fn close_write(&mut self) -> Result<(), IoError> {
        let _m = self.fire_homing_missile();
        shutdown(self.handle, &self.uv_loop())
304
    }
305 306 307 308 309 310 311 312 313 314 315 316 317

    fn set_timeout(&mut self, timeout: Option<u64>) {
        self.set_read_timeout(timeout);
        self.set_write_timeout(timeout);
    }

    fn set_read_timeout(&mut self, ms: Option<u64>) {
        let _m = self.fire_homing_missile();
        let loop_ = self.uv_loop();
        self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
                                     &self.stream as *_ as uint);

        fn cancel_read(stream: uint) -> Option<BlockedTask> {
A
Alex Crichton 已提交
318
            let stream: &mut StreamWatcher = unsafe { mem::transmute(stream) };
319 320 321 322 323 324 325 326 327 328 329
            stream.cancel_read(uvll::ECANCELED as ssize_t)
        }
    }

    fn set_write_timeout(&mut self, ms: Option<u64>) {
        let _m = self.fire_homing_missile();
        let loop_ = self.uv_loop();
        self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
                                      &self.stream as *_ as uint);

        fn cancel_write(stream: uint) -> Option<BlockedTask> {
A
Alex Crichton 已提交
330
            let stream: &mut StreamWatcher = unsafe { mem::transmute(stream) };
331 332 333
            stream.cancel_write()
        }
    }
334
}
335

336 337 338 339
impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
    fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle }
}

340 341
impl Drop for TcpWatcher {
    fn drop(&mut self) {
342
        let _m = self.fire_homing_missile();
343 344 345
        if self.refcount.decrement() {
            self.close();
        }
346 347
    }
}
348

349 350 351
// TCP listeners (unbound servers)

impl TcpListener {
352
    pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
353
                -> Result<Box<TcpListener>, UvError> {
354 355
        let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
        assert_eq!(unsafe {
356
            uvll::uv_tcp_init(io.uv_loop(), handle)
357
        }, 0);
358
        let (tx, rx) = channel();
359
        let l = box TcpListener {
360
            home: io.make_handle(),
361 362
            handle: handle,
            closing_task: None,
363 364
            outgoing: tx,
            incoming: rx,
365
        };
366 367 368 369 370
        let (addr, _len) = addr_to_sockaddr(address);
        let res = unsafe {
            let addr_p = &addr as *libc::sockaddr_storage;
            uvll::uv_tcp_bind(l.handle, addr_p as *libc::sockaddr)
        };
371 372 373 374
        return match res {
            0 => Ok(l.install()),
            n => Err(UvError(n))
        };
375
    }
376
}
377

378
impl HomingIO for TcpListener {
379
    fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
380 381 382 383 384 385 386
}

impl UvHandle<uvll::uv_tcp_t> for TcpListener {
    fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
}

impl rtio::RtioSocket for TcpListener {
387
    fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
388
        let _m = self.fire_homing_missile();
389
        socket_name(Tcp, self.handle)
390
    }
391
}
392

393
impl rtio::RtioTcpListener for TcpListener {
394
    fn listen(~self) -> Result<Box<rtio::RtioTcpAcceptor:Send>, IoError> {
395
        // create the acceptor object from ourselves
396
        let mut acceptor = box TcpAcceptor {
397
            listener: self,
398
            timeout: AcceptTimeout::new(),
399
        };
400

401
        let _m = acceptor.fire_homing_missile();
402
        // FIXME: the 128 backlog should be configurable
403
        match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
404
            0 => Ok(acceptor as Box<rtio::RtioTcpAcceptor:Send>),
405
            n => Err(uv_error_to_io_error(UvError(n))),
406
        }
407 408
    }
}
409

410
extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
411
    assert!(status != uvll::ECANCELED);
412
    let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
413 414
    let msg = match status {
        0 => {
415
            let loop_ = Loop::wrap(unsafe {
416 417
                uvll::get_loop_for_uv_handle(server)
            });
418
            let client = TcpWatcher::new_home(&loop_, tcp.home().clone());
419
            assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
420
            Ok(box client as Box<rtio::RtioTcpStream:Send>)
421
        }
422 423 424 425 426 427 428
        n => Err(uv_error_to_io_error(UvError(n)))
    };
    tcp.outgoing.send(msg);
}

impl Drop for TcpListener {
    fn drop(&mut self) {
429 430
        let _m = self.fire_homing_missile();
        self.close();
431 432 433
    }
}

434 435 436
// TCP acceptors (bound servers)

impl HomingIO for TcpAcceptor {
437
    fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
438
}
439

440
impl rtio::RtioSocket for TcpAcceptor {
441
    fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
442
        let _m = self.fire_homing_missile();
443 444 445
        socket_name(Tcp, self.listener.handle)
    }
}
446

447
impl rtio::RtioTcpAcceptor for TcpAcceptor {
448
    fn accept(&mut self) -> Result<Box<rtio::RtioTcpStream:Send>, IoError> {
449
        self.timeout.accept(&self.listener.incoming)
450 451
    }

452
    fn accept_simultaneously(&mut self) -> Result<(), IoError> {
453
        let _m = self.fire_homing_missile();
454 455 456
        status_to_io_result(unsafe {
            uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
        })
457 458
    }

459
    fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
460
        let _m = self.fire_homing_missile();
461 462 463
        status_to_io_result(unsafe {
            uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
        })
464
    }
465 466

    fn set_timeout(&mut self, ms: Option<u64>) {
467
        let _m = self.fire_homing_missile();
468 469 470
        match ms {
            None => self.timeout.clear(),
            Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
471 472
        }
    }
473 474
}

475 476 477 478 479 480
////////////////////////////////////////////////////////////////////////////////
/// UDP implementation
////////////////////////////////////////////////////////////////////////////////

pub struct UdpWatcher {
    handle: *uvll::uv_udp_t,
481
    home: HomeHandle,
482 483

    // See above for what these fields are
484
    refcount: Refcount,
485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500
    read_access: AccessTimeout,
    write_access: AccessTimeout,

    blocked_sender: Option<BlockedTask>,
}

struct UdpRecvCtx {
    task: Option<BlockedTask>,
    buf: Option<Buf>,
    result: Option<(ssize_t, Option<ip::SocketAddr>)>,
}

struct UdpSendCtx {
    result: c_int,
    data: Option<Vec<u8>>,
    udp: *mut UdpWatcher,
501 502 503
}

impl UdpWatcher {
504
    pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
505
                -> Result<UdpWatcher, UvError> {
506 507
        let udp = UdpWatcher {
            handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
508
            home: io.make_handle(),
509
            refcount: Refcount::new(),
510 511 512
            read_access: AccessTimeout::new(),
            write_access: AccessTimeout::new(),
            blocked_sender: None,
513 514
        };
        assert_eq!(unsafe {
515
            uvll::uv_udp_init(io.uv_loop(), udp.handle)
516
        }, 0);
517 518 519 520 521
        let (addr, _len) = addr_to_sockaddr(address);
        let result = unsafe {
            let addr_p = &addr as *libc::sockaddr_storage;
            uvll::uv_udp_bind(udp.handle, addr_p as *libc::sockaddr, 0u32)
        };
522 523 524 525
        return match result {
            0 => Ok(udp),
            n => Err(UvError(n)),
        };
526
    }
527 528
}

529 530 531 532
impl UvHandle<uvll::uv_udp_t> for UdpWatcher {
    fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle }
}

533
impl HomingIO for UdpWatcher {
534
    fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
535 536 537
}

impl rtio::RtioSocket for UdpWatcher {
538
    fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
539
        let _m = self.fire_homing_missile();
540
        socket_name(Udp, self.handle)
541 542 543
    }
}

544 545
impl rtio::RtioUdpSocket for UdpWatcher {
    fn recvfrom(&mut self, buf: &mut [u8])
546
        -> Result<(uint, ip::SocketAddr), IoError>
547
    {
548
        let loop_ = self.uv_loop();
549
        let m = self.fire_homing_missile();
550
        let _guard = try!(self.read_access.grant(m));
551

552
        return match unsafe {
553 554 555
            uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
        } {
            0 => {
556
                let mut cx = UdpRecvCtx {
557 558 559 560
                    task: None,
                    buf: Some(slice_to_uv_buf(buf)),
                    result: None,
                };
561
                let handle = self.handle;
562
                wait_until_woken_after(&mut cx.task, &loop_, || {
563
                    unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
564
                });
565 566 567
                match cx.result.take_unwrap() {
                    (n, _) if n < 0 =>
                        Err(uv_error_to_io_error(UvError(n as c_int))),
A
Alex Crichton 已提交
568
                    (n, addr) => Ok((n as uint, addr.unwrap()))
569 570 571 572 573 574
                }
            }
            n => Err(uv_error_to_io_error(UvError(n)))
        };

        extern fn alloc_cb(handle: *uvll::uv_udp_t,
A
Alex Crichton 已提交
575 576 577
                           _suggested_size: size_t,
                           buf: *mut Buf) {
            unsafe {
578 579
                let cx = uvll::get_data_for_uv_handle(handle);
                let cx = &mut *(cx as *mut UdpRecvCtx);
A
Alex Crichton 已提交
580 581
                *buf = cx.buf.take().expect("recv alloc_cb called more than once")
            }
582
        }
583

A
Alex Crichton 已提交
584
        extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: *Buf,
585
                          addr: *libc::sockaddr, _flags: c_uint) {
586
            assert!(nread != uvll::ECANCELED as ssize_t);
587
            let cx = unsafe {
588
                &mut *(uvll::get_data_for_uv_handle(handle) as *mut UdpRecvCtx)
589
            };
590

591 592 593
            // When there's no data to read the recv callback can be a no-op.
            // This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
            // this we just drop back to kqueue and wait for the next callback.
594
            if nread == 0 {
A
Alex Crichton 已提交
595
                cx.buf = Some(unsafe { *buf });
596 597
                return
            }
598

599
            unsafe { assert_eq!(uvll::uv_udp_recv_stop(handle), 0) }
A
Alex Crichton 已提交
600 601 602
            let addr = if addr == ptr::null() {
                None
            } else {
603
                let len = mem::size_of::<libc::sockaddr_storage>();
A
Alex Crichton 已提交
604
                Some(sockaddr_to_addr(unsafe { mem::transmute(addr) }, len))
A
Alex Crichton 已提交
605
            };
606
            cx.result = Some((nread, addr));
607
            wakeup(&mut cx.task);
608
        }
609 610
    }

611
    fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> {
612
        let m = self.fire_homing_missile();
613
        let loop_ = self.uv_loop();
614
        let guard = try!(self.write_access.grant(m));
615

616
        let mut req = Request::new(uvll::UV_UDP_SEND);
617
        let (addr, _len) = addr_to_sockaddr(dst);
618 619 620 621 622 623 624 625 626
        let addr_p = &addr as *_ as *libc::sockaddr;

        // see comments in StreamWatcher::write for why we may allocate a buffer
        // here.
        let data = if guard.can_timeout {Some(Vec::from_slice(buf))} else {None};
        let uv_buf = if guard.can_timeout {
            slice_to_uv_buf(data.get_ref().as_slice())
        } else {
            slice_to_uv_buf(buf)
627
        };
628

629 630 631
        return match unsafe {
            uvll::uv_udp_send(req.handle, self.handle, [uv_buf], addr_p, send_cb)
        } {
632
            0 => {
633
                req.defuse(); // uv callback now owns this request
634 635 636 637
                let mut cx = UdpSendCtx {
                    result: uvll::ECANCELED, data: data, udp: self as *mut _
                };
                wait_until_woken_after(&mut self.blocked_sender, &loop_, || {
638
                    req.set_data(&cx);
639
                });
640 641 642 643 644 645

                if cx.result != uvll::ECANCELED {
                    return match cx.result {
                        0 => Ok(()),
                        n => Err(uv_error_to_io_error(UvError(n)))
                    }
646
                }
A
Alex Crichton 已提交
647
                let new_cx = box UdpSendCtx {
648 649 650 651 652 653
                    result: 0,
                    udp: 0 as *mut UdpWatcher,
                    data: cx.data.take(),
                };
                unsafe {
                    req.set_data(&*new_cx);
A
Alex Crichton 已提交
654
                    mem::forget(new_cx);
655 656
                }
                Err(uv_error_to_io_error(UvError(cx.result)))
657 658 659 660
            }
            n => Err(uv_error_to_io_error(UvError(n)))
        };

661 662
        // This function is the same as stream::write_cb, but adapted for udp
        // instead of streams.
663 664
        extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
            let req = Request::wrap(req);
665
            let cx: &mut UdpSendCtx = unsafe { req.get_data() };
666
            cx.result = status;
667 668 669 670 671

            if cx.udp as uint != 0 {
                let udp: &mut UdpWatcher = unsafe { &mut *cx.udp };
                wakeup(&mut udp.blocked_sender);
            } else {
A
Alex Crichton 已提交
672
                let _cx: Box<UdpSendCtx> = unsafe { mem::transmute(cx) };
673
            }
674 675 676
        }
    }

677
    fn join_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
678
        let _m = self.fire_homing_missile();
679
        status_to_io_result(unsafe {
680
            multi.to_str().with_c_str(|m_addr| {
681 682 683
                uvll::uv_udp_set_membership(self.handle,
                                            m_addr, ptr::null(),
                                            uvll::UV_JOIN_GROUP)
684
            })
685
        })
686 687
    }

688
    fn leave_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
689
        let _m = self.fire_homing_missile();
690
        status_to_io_result(unsafe {
691
            multi.to_str().with_c_str(|m_addr| {
692 693 694
                uvll::uv_udp_set_membership(self.handle,
                                            m_addr, ptr::null(),
                                            uvll::UV_LEAVE_GROUP)
695
            })
696
        })
697
    }
698 699

    fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
700
        let _m = self.fire_homing_missile();
701 702 703 704
        status_to_io_result(unsafe {
            uvll::uv_udp_set_multicast_loop(self.handle,
                                            1 as c_int)
        })
705 706
    }

707
    fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
708
        let _m = self.fire_homing_missile();
709 710 711 712 713
        status_to_io_result(unsafe {
            uvll::uv_udp_set_multicast_loop(self.handle,
                                            0 as c_int)
        })
    }
714

715
    fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
716
        let _m = self.fire_homing_missile();
717 718 719 720
        status_to_io_result(unsafe {
            uvll::uv_udp_set_multicast_ttl(self.handle,
                                           ttl as c_int)
        })
721 722
    }

723
    fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
724
        let _m = self.fire_homing_missile();
725 726 727
        status_to_io_result(unsafe {
            uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
        })
728 729
    }

730
    fn hear_broadcasts(&mut self) -> Result<(), IoError> {
731
        let _m = self.fire_homing_missile();
732 733 734 735
        status_to_io_result(unsafe {
            uvll::uv_udp_set_broadcast(self.handle,
                                       1 as c_int)
        })
736 737
    }

738
    fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
739
        let _m = self.fire_homing_missile();
740 741 742 743
        status_to_io_result(unsafe {
            uvll::uv_udp_set_broadcast(self.handle,
                                       0 as c_int)
        })
744
    }
745

746
    fn clone(&self) -> Box<rtio::RtioUdpSocket:Send> {
747
        box UdpWatcher {
748 749 750 751 752
            handle: self.handle,
            home: self.home.clone(),
            refcount: self.refcount.clone(),
            write_access: self.write_access.clone(),
            read_access: self.read_access.clone(),
753
            blocked_sender: None,
754
        } as Box<rtio::RtioUdpSocket:Send>
755
    }
756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790

    fn set_timeout(&mut self, timeout: Option<u64>) {
        self.set_read_timeout(timeout);
        self.set_write_timeout(timeout);
    }

    fn set_read_timeout(&mut self, ms: Option<u64>) {
        let _m = self.fire_homing_missile();
        let loop_ = self.uv_loop();
        self.read_access.set_timeout(ms, &self.home, &loop_, cancel_read,
                                     self.handle as uint);

        fn cancel_read(stream: uint) -> Option<BlockedTask> {
            // This method is quite similar to StreamWatcher::cancel_read, see
            // there for more information
            let handle = stream as *uvll::uv_udp_t;
            assert_eq!(unsafe { uvll::uv_udp_recv_stop(handle) }, 0);
            let data = unsafe {
                let data = uvll::get_data_for_uv_handle(handle);
                if data.is_null() { return None }
                uvll::set_data_for_uv_handle(handle, 0 as *int);
                &mut *(data as *mut UdpRecvCtx)
            };
            data.result = Some((uvll::ECANCELED as ssize_t, None));
            data.task.take()
        }
    }

    fn set_write_timeout(&mut self, ms: Option<u64>) {
        let _m = self.fire_homing_missile();
        let loop_ = self.uv_loop();
        self.write_access.set_timeout(ms, &self.home, &loop_, cancel_write,
                                      self as *mut _ as uint);

        fn cancel_write(stream: uint) -> Option<BlockedTask> {
A
Alex Crichton 已提交
791
            let stream: &mut UdpWatcher = unsafe { mem::transmute(stream) };
792 793 794
            stream.blocked_sender.take()
        }
    }
795 796 797 798 799
}

impl Drop for UdpWatcher {
    fn drop(&mut self) {
        // Send ourselves home to close this handle (blocking while doing so).
800
        let _m = self.fire_homing_missile();
801 802 803
        if self.refcount.decrement() {
            self.close();
        }
804 805 806
    }
}

807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840
////////////////////////////////////////////////////////////////////////////////
// Shutdown helper
////////////////////////////////////////////////////////////////////////////////

pub fn shutdown(handle: *uvll::uv_stream_t, loop_: &Loop) -> Result<(), IoError> {
    struct Ctx {
        slot: Option<BlockedTask>,
        status: c_int,
    }
    let mut req = Request::new(uvll::UV_SHUTDOWN);

    return match unsafe { uvll::uv_shutdown(req.handle, handle, shutdown_cb) } {
        0 => {
            req.defuse(); // uv callback now owns this request
            let mut cx = Ctx { slot: None, status: 0 };

            wait_until_woken_after(&mut cx.slot, loop_, || {
                req.set_data(&cx);
            });

            status_to_io_result(cx.status)
        }
        n => Err(uv_error_to_io_error(UvError(n)))
    };

    extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
        let req = Request::wrap(req);
        assert!(status != uvll::ECANCELED);
        let cx: &mut Ctx = unsafe { req.get_data() };
        cx.status = status;
        wakeup(&mut cx.slot);
    }
}

841 842
#[cfg(test)]
mod test {
A
Alex Crichton 已提交
843 844
    use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
                        RtioUdpSocket};
845
    use std::io::test::{next_test_ip4, next_test_ip6};
A
Alex Crichton 已提交
846

847
    use super::{UdpWatcher, TcpWatcher, TcpListener};
848
    use super::super::local_loop;
849 850 851

    #[test]
    fn connect_close_ip4() {
852
        match TcpWatcher::connect(local_loop(), next_test_ip4(), None) {
A
Alex Crichton 已提交
853
            Ok(..) => fail!(),
854
            Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_strbuf()),
855 856 857 858 859
        }
    }

    #[test]
    fn connect_close_ip6() {
860
        match TcpWatcher::connect(local_loop(), next_test_ip6(), None) {
A
Alex Crichton 已提交
861
            Ok(..) => fail!(),
862
            Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_strbuf()),
863 864 865 866 867
        }
    }

    #[test]
    fn udp_bind_close_ip4() {
868
        match UdpWatcher::bind(local_loop(), next_test_ip4()) {
A
Alex Crichton 已提交
869 870
            Ok(..) => {}
            Err(..) => fail!()
871 872 873 874 875
        }
    }

    #[test]
    fn udp_bind_close_ip6() {
876
        match UdpWatcher::bind(local_loop(), next_test_ip6()) {
A
Alex Crichton 已提交
877 878
            Ok(..) => {}
            Err(..) => fail!()
879 880 881 882 883
        }
    }

    #[test]
    fn listen_ip4() {
884
        let (tx, rx) = channel();
885
        let addr = next_test_ip4();
886

887
        spawn(proc() {
888
            let w = match TcpListener::bind(local_loop(), addr) {
A
Alex Crichton 已提交
889
                Ok(w) => w, Err(e) => fail!("{:?}", e)
890
            };
891 892 893
            let mut w = match w.listen() {
                Ok(w) => w, Err(e) => fail!("{:?}", e),
            };
894
            tx.send(());
895 896 897 898 899 900 901
            match w.accept() {
                Ok(mut stream) => {
                    let mut buf = [0u8, ..10];
                    match stream.read(buf) {
                        Ok(10) => {} e => fail!("{:?}", e),
                    }
                    for i in range(0, 10u8) {
902
                        assert_eq!(buf[i as uint], i + 1);
903 904 905
                    }
                }
                Err(e) => fail!("{:?}", e)
A
Alex Crichton 已提交
906
            }
907
        });
908

909
        rx.recv();
910
        let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
911 912 913 914 915
            Ok(w) => w, Err(e) => fail!("{:?}", e)
        };
        match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
            Ok(()) => {}, Err(e) => fail!("{:?}", e)
        }
916 917 918 919
    }

    #[test]
    fn listen_ip6() {
920
        let (tx, rx) = channel();
921
        let addr = next_test_ip6();
A
Alex Crichton 已提交
922

923
        spawn(proc() {
924
            let w = match TcpListener::bind(local_loop(), addr) {
A
Alex Crichton 已提交
925
                Ok(w) => w, Err(e) => fail!("{:?}", e)
926
            };
927 928 929
            let mut w = match w.listen() {
                Ok(w) => w, Err(e) => fail!("{:?}", e),
            };
930
            tx.send(());
931 932 933 934 935 936 937
            match w.accept() {
                Ok(mut stream) => {
                    let mut buf = [0u8, ..10];
                    match stream.read(buf) {
                        Ok(10) => {} e => fail!("{:?}", e),
                    }
                    for i in range(0, 10u8) {
938
                        assert_eq!(buf[i as uint], i + 1);
939 940 941
                    }
                }
                Err(e) => fail!("{:?}", e)
A
Alex Crichton 已提交
942
            }
943
        });
944

945
        rx.recv();
946
        let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
947 948 949 950 951
            Ok(w) => w, Err(e) => fail!("{:?}", e)
        };
        match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
            Ok(()) => {}, Err(e) => fail!("{:?}", e)
        }
A
Alex Crichton 已提交
952 953 954 955
    }

    #[test]
    fn udp_recv_ip4() {
956
        let (tx, rx) = channel();
957 958 959
        let client = next_test_ip4();
        let server = next_test_ip4();

960
        spawn(proc() {
961 962
            match UdpWatcher::bind(local_loop(), server) {
                Ok(mut w) => {
963
                    tx.send(());
964 965 966 967 968 969
                    let mut buf = [0u8, ..10];
                    match w.recvfrom(buf) {
                        Ok((10, addr)) => assert_eq!(addr, client),
                        e => fail!("{:?}", e),
                    }
                    for i in range(0, 10u8) {
970
                        assert_eq!(buf[i as uint], i + 1);
971 972
                    }
                }
973
                Err(e) => fail!("{:?}", e)
A
Alex Crichton 已提交
974
            }
975
        });
A
Alex Crichton 已提交
976

977
        rx.recv();
978 979 980 981 982
        let mut w = match UdpWatcher::bind(local_loop(), client) {
            Ok(w) => w, Err(e) => fail!("{:?}", e)
        };
        match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
            Ok(()) => {}, Err(e) => fail!("{:?}", e)
A
Alex Crichton 已提交
983 984 985 986 987
        }
    }

    #[test]
    fn udp_recv_ip6() {
988
        let (tx, rx) = channel();
989 990 991
        let client = next_test_ip6();
        let server = next_test_ip6();

992
        spawn(proc() {
993 994
            match UdpWatcher::bind(local_loop(), server) {
                Ok(mut w) => {
995
                    tx.send(());
996 997 998 999 1000 1001
                    let mut buf = [0u8, ..10];
                    match w.recvfrom(buf) {
                        Ok((10, addr)) => assert_eq!(addr, client),
                        e => fail!("{:?}", e),
                    }
                    for i in range(0, 10u8) {
1002
                        assert_eq!(buf[i as uint], i + 1);
A
Alex Crichton 已提交
1003 1004
                    }
                }
1005
                Err(e) => fail!("{:?}", e)
A
Alex Crichton 已提交
1006
            }
1007
        });
1008

1009
        rx.recv();
1010 1011 1012 1013 1014
        let mut w = match UdpWatcher::bind(local_loop(), client) {
            Ok(w) => w, Err(e) => fail!("{:?}", e)
        };
        match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
            Ok(()) => {}, Err(e) => fail!("{:?}", e)
1015 1016 1017 1018
        }
    }

    #[test]
A
Alex Crichton 已提交
1019
    fn test_read_read_read() {
1020 1021
        let addr = next_test_ip4();
        static MAX: uint = 5000;
1022
        let (tx, rx) = channel();
1023

1024
        spawn(proc() {
1025 1026
            let listener = TcpListener::bind(local_loop(), addr).unwrap();
            let mut acceptor = listener.listen().unwrap();
1027
            tx.send(());
1028 1029 1030 1031 1032 1033 1034
            let mut stream = acceptor.accept().unwrap();
            let buf = [1, .. 2048];
            let mut total_bytes_written = 0;
            while total_bytes_written < MAX {
                assert!(stream.write(buf).is_ok());
                uvdebug!("wrote bytes");
                total_bytes_written += buf.len();
A
Alex Crichton 已提交
1035
            }
1036
        });
A
Alex Crichton 已提交
1037

1038
        rx.recv();
1039
        let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1040 1041 1042 1043 1044 1045 1046
        let mut buf = [0, .. 2048];
        let mut total_bytes_read = 0;
        while total_bytes_read < MAX {
            let nread = stream.read(buf).unwrap();
            total_bytes_read += nread;
            for i in range(0u, nread) {
                assert_eq!(buf[i], 1);
A
Alex Crichton 已提交
1047 1048
            }
        }
1049
        uvdebug!("read {} bytes total", total_bytes_read);
A
Alex Crichton 已提交
1050 1051 1052
    }

    #[test]
A
Alex Crichton 已提交
1053
    #[ignore(cfg(windows))] // FIXME(#10102) server never sees second packet
A
Alex Crichton 已提交
1054
    fn test_udp_twice() {
1055 1056
        let server_addr = next_test_ip4();
        let client_addr = next_test_ip4();
1057
        let (tx, rx) = channel();
1058

1059
        spawn(proc() {
1060
            let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
1061
            rx.recv();
1062 1063
            assert!(client.sendto([1], server_addr).is_ok());
            assert!(client.sendto([2], server_addr).is_ok());
1064
        });
1065 1066

        let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
1067
        tx.send(());
1068 1069 1070 1071 1072 1073 1074 1075 1076 1077
        let mut buf1 = [0];
        let mut buf2 = [0];
        let (nread1, src1) = server.recvfrom(buf1).unwrap();
        let (nread2, src2) = server.recvfrom(buf2).unwrap();
        assert_eq!(nread1, 1);
        assert_eq!(nread2, 1);
        assert_eq!(src1, client_addr);
        assert_eq!(src2, client_addr);
        assert_eq!(buf1[0], 1);
        assert_eq!(buf2[0], 2);
A
Alex Crichton 已提交
1078
    }
1079

A
Alex Crichton 已提交
1080 1081
    #[test]
    fn test_udp_many_read() {
1082 1083 1084 1085 1086 1087
        let server_out_addr = next_test_ip4();
        let server_in_addr = next_test_ip4();
        let client_out_addr = next_test_ip4();
        let client_in_addr = next_test_ip4();
        static MAX: uint = 500_000;

1088 1089
        let (tx1, rx1) = channel::<()>();
        let (tx2, rx2) = channel::<()>();
1090

1091
        spawn(proc() {
1092 1093 1094
            let l = local_loop();
            let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
            let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
1095 1096 1097
            let (tx, rx) = (tx2, rx1);
            tx.send(());
            rx.recv();
1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110
            let msg = [1, .. 2048];
            let mut total_bytes_sent = 0;
            let mut buf = [1];
            while buf[0] == 1 {
                // send more data
                assert!(server_out.sendto(msg, client_in_addr).is_ok());
                total_bytes_sent += msg.len();
                // check if the client has received enough
                let res = server_in.recvfrom(buf);
                assert!(res.is_ok());
                let (nread, src) = res.unwrap();
                assert_eq!(nread, 1);
                assert_eq!(src, client_out_addr);
A
Alex Crichton 已提交
1111
            }
1112
            assert!(total_bytes_sent >= MAX);
1113
        });
1114

1115 1116 1117
        let l = local_loop();
        let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
        let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
1118 1119 1120
        let (tx, rx) = (tx1, rx2);
        rx.recv();
        tx.send(());
1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133
        let mut total_bytes_recv = 0;
        let mut buf = [0, .. 2048];
        while total_bytes_recv < MAX {
            // ask for more
            assert!(client_out.sendto([1], server_in_addr).is_ok());
            // wait for data
            let res = client_in.recvfrom(buf);
            assert!(res.is_ok());
            let (nread, src) = res.unwrap();
            assert_eq!(src, server_out_addr);
            total_bytes_recv += nread;
            for i in range(0u, nread) {
                assert_eq!(buf[i], 1);
A
Alex Crichton 已提交
1134 1135
            }
        }
1136 1137
        // tell the server we're done
        assert!(client_out.sendto([0], server_in_addr).is_ok());
A
Alex Crichton 已提交
1138 1139 1140 1141
    }

    #[test]
    fn test_read_and_block() {
1142
        let addr = next_test_ip4();
1143
        let (tx, rx) = channel::<Receiver<()>>();
1144

1145
        spawn(proc() {
1146
            let rx = rx.recv();
1147
            let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1148 1149
            stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
            stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1150
            rx.recv();
1151 1152
            stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
            stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1153
            rx.recv();
1154
        });
1155 1156 1157

        let listener = TcpListener::bind(local_loop(), addr).unwrap();
        let mut acceptor = listener.listen().unwrap();
1158 1159
        let (tx2, rx2) = channel();
        tx.send(rx2);
1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175
        let mut stream = acceptor.accept().unwrap();
        let mut buf = [0, .. 2048];

        let expected = 32;
        let mut current = 0;
        let mut reads = 0;

        while current < expected {
            let nread = stream.read(buf).unwrap();
            for i in range(0u, nread) {
                let val = buf[i] as uint;
                assert_eq!(val, current % 8);
                current += 1;
            }
            reads += 1;

1176
            let _ = tx2.send_opt(());
1177 1178 1179 1180
        }

        // Make sure we had multiple reads
        assert!(reads > 1);
A
Alex Crichton 已提交
1181
    }
1182

A
Alex Crichton 已提交
1183 1184 1185 1186
    #[test]
    fn test_simple_tcp_server_and_client_on_diff_threads() {
        let addr = next_test_ip4();

1187
        spawn(proc() {
1188 1189 1190 1191 1192 1193 1194 1195
            let listener = TcpListener::bind(local_loop(), addr).unwrap();
            let mut acceptor = listener.listen().unwrap();
            let mut stream = acceptor.accept().unwrap();
            let mut buf = [0, .. 2048];
            let nread = stream.read(buf).unwrap();
            assert_eq!(nread, 8);
            for i in range(0u, nread) {
                assert_eq!(buf[i], i as u8);
1196
            }
1197
        });
1198

1199
        let mut stream = TcpWatcher::connect(local_loop(), addr, None);
1200
        while stream.is_err() {
1201
            stream = TcpWatcher::connect(local_loop(), addr, None);
1202
        }
1203
        stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
1204
    }
A
Alex Crichton 已提交
1205

1206 1207 1208 1209 1210 1211 1212 1213 1214 1215
    #[should_fail] #[test]
    fn tcp_listener_fail_cleanup() {
        let addr = next_test_ip4();
        let w = TcpListener::bind(local_loop(), addr).unwrap();
        let _w = w.listen().unwrap();
        fail!();
    }

    #[should_fail] #[test]
    fn tcp_stream_fail_cleanup() {
1216
        let (tx, rx) = channel();
1217 1218
        let addr = next_test_ip4();

1219
        spawn(proc() {
1220 1221
            let w = TcpListener::bind(local_loop(), addr).unwrap();
            let mut w = w.listen().unwrap();
1222
            tx.send(());
1223
            drop(w.accept().unwrap());
1224
        });
1225
        rx.recv();
1226
        let _w = TcpWatcher::connect(local_loop(), addr, None).unwrap();
1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239
        fail!();
    }

    #[should_fail] #[test]
    fn udp_listener_fail_cleanup() {
        let addr = next_test_ip4();
        let _w = UdpWatcher::bind(local_loop(), addr).unwrap();
        fail!();
    }

    #[should_fail] #[test]
    fn udp_fail_other_task() {
        let addr = next_test_ip4();
1240
        let (tx, rx) = channel();
1241 1242 1243 1244

        // force the handle to be created on a different scheduler, failure in
        // the original task will force a homing operation back to this
        // scheduler.
1245
        spawn(proc() {
1246
            let w = UdpWatcher::bind(local_loop(), addr).unwrap();
1247
            tx.send(w);
1248
        });
1249

1250
        let _w = rx.recv();
1251 1252
        fail!();
    }
1253
}