net_tcp.rs 59.1 KB
Newer Older
1
//! High-level interface to libuv's TCP functionality
2

P
Patrick Walton 已提交
3 4 5 6
use ip = net_ip;
use uv::iotask;
use uv::iotask::IoTask;
use future_spawn = future::spawn;
J
Jeff Olson 已提交
7 8
// FIXME #1935
// should be able to, but can't atm, replace w/ result::{result, extensions};
P
Patrick Walton 已提交
9 10 11 12
use result::*;
use libc::size_t;
use io::{Reader, ReaderUtil, Writer};
use comm = core::comm;
J
Jeff Olson 已提交
13

14
// tcp interfaces
B
Brian Anderson 已提交
15
export TcpSocket;
16
// buffered socket
17
export TcpSocketBuf, socket_buf;
18
// errors
B
Brian Anderson 已提交
19
export TcpErrData, TcpConnectErrData;
20
// operations on a tcp_socket
21
export write, write_future, read_start, read_stop;
22
// tcp server stuff
23
export listen, accept;
24 25
// tcp client stuff
export connect;
J
Jeff Olson 已提交
26

27
#[nolink]
28
extern mod rustrt {
29
    fn rust_uv_current_kernel_malloc(size: libc::c_uint) -> *libc::c_void;
30
    fn rust_uv_current_kernel_free(mem: *libc::c_void);
31
    fn rust_uv_helper_uv_tcp_t_size() -> libc::c_uint;
32 33
}

34 35 36 37 38 39 40
/**
 * Encapsulates an open TCP/IP connection through libuv
 *
 * `tcp_socket` is non-copyable/sendable and automagically handles closing the
 * underlying libuv data structures when it goes out of scope. This is the
 * data structure that is used for read/write operations over a TCP stream.
 */
B
Brian Anderson 已提交
41
struct TcpSocket {
42
  socket_data: @TcpSocketData,
43
  drop {
44
    unsafe {
45
        tear_down_socket_data(self.socket_data)
46 47
    }
  }
J
Jeff Olson 已提交
48 49
}

B
Brian Anderson 已提交
50 51 52 53 54 55
fn TcpSocket(socket_data: @TcpSocketData) -> TcpSocket {
    TcpSocket {
        socket_data: socket_data
    }
}

56 57 58 59
/**
 * A buffered wrapper for `net::tcp::tcp_socket`
 *
 * It is created with a call to `net::tcp::socket_buf()` and has impls that
60
 * satisfy both the `io::reader` and `io::writer` traits.
61
 */
B
Brian Anderson 已提交
62
struct TcpSocketBuf {
63
    data: @TcpBufferedSocketData,
B
Brian Anderson 已提交
64 65 66 67 68 69
}

fn TcpSocketBuf(data: @TcpBufferedSocketData) -> TcpSocketBuf {
    TcpSocketBuf {
        data: data
    }
70 71
}

72
/// Contains raw, string-based, error information returned from libuv
B
Brian Anderson 已提交
73
type TcpErrData = {
74 75
    err_name: ~str,
    err_msg: ~str
76
};
77
/// Details returned as part of a `result::err` result from `tcp::listen`
B
Brian Anderson 已提交
78
enum TcpListenErrData {
79 80 81 82
    /**
     * Some unplanned-for error. The first and second fields correspond
     * to libuv's `err_name` and `err_msg` fields, respectively.
     */
B
Brian Anderson 已提交
83
    GenericListenErr(~str, ~str),
84 85 86 87 88 89 90
    /**
     * Failed to bind to the requested IP/Port, because it is already in use.
     *
     * # Possible Causes
     *
     * * Attempting to bind to a port already bound to another listener
     */
B
Brian Anderson 已提交
91
    AddressInUse,
92 93 94 95 96 97 98 99 100 101
    /**
     * Request to bind to an IP/Port was denied by the system.
     *
     * # Possible Causes
     *
     * * Attemping to binding to an IP/Port as a non-Administrator
     *   on Windows Vista+
     * * Attempting to bind, as a non-priv'd
     *   user, to 'privileged' ports (< 1024) on *nix
     */
B
Brian Anderson 已提交
102
    AccessDenied
103
}
104
/// Details returned as part of a `result::err` result from `tcp::connect`
B
Brian Anderson 已提交
105
enum TcpConnectErrData {
106 107 108 109
    /**
     * Some unplanned-for error. The first and second fields correspond
     * to libuv's `err_name` and `err_msg` fields, respectively.
     */
B
Brian Anderson 已提交
110
    GenericConnectErr(~str, ~str),
111
    /// Invalid IP or invalid port
B
Brian Anderson 已提交
112
    ConnectionRefused
113
}
114

115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
/**
 * Initiate a client connection over TCP/IP
 *
 * # Arguments
 *
 * * `input_ip` - The IP address (versions 4 or 6) of the remote host
 * * `port` - the unsigned integer of the desired remote host port
 * * `iotask` - a `uv::iotask` that the tcp request will run on
 *
 * # Returns
 *
 * A `result` that, if the operation succeeds, contains a
 * `net::net::tcp_socket` that can be used to send and receive data to/from
 * the remote host. In the event of failure, a
 * `net::tcp::tcp_connect_err_data` instance will be returned
 */
B
Brian Anderson 已提交
131
fn connect(-input_ip: ip::IpAddr, port: uint,
B
Brian Anderson 已提交
132
           iotask: IoTask)
B
Brian Anderson 已提交
133 134
    -> result::Result<TcpSocket, TcpConnectErrData> unsafe {
    let result_po = core::comm::Port::<ConnAttempt>();
135
    let closed_signal_po = core::comm::Port::<()>();
J
Jeff Olson 已提交
136
    let conn_data = {
137 138
        result_ch: core::comm::Chan(result_po),
        closed_signal_ch: core::comm::Chan(closed_signal_po)
J
Jeff Olson 已提交
139 140
    };
    let conn_data_ptr = ptr::addr_of(conn_data);
B
Brian Anderson 已提交
141
    let reader_po = core::comm::Port::<result::Result<~[u8], TcpErrData>>();
142 143 144
    let stream_handle_ptr = malloc_uv_tcp_t();
    *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
    let socket_data = @{
145
        reader_po: reader_po,
146
        reader_ch: core::comm::Chan(reader_po),
147 148 149
        stream_handle_ptr: stream_handle_ptr,
        connect_req: uv::ll::connect_t(),
        write_req: uv::ll::write_t(),
150
        iotask: iotask
J
Jeff Olson 已提交
151
    };
152
    let socket_data_ptr = ptr::addr_of(*socket_data);
P
Paul Stansifer 已提交
153
    log(debug, fmt!("tcp_connect result_ch %?", conn_data.result_ch));
J
Jeff Olson 已提交
154 155
    // get an unsafe representation of our stream_handle_ptr that
    // we can send into the interact cb to be handled in libuv..
P
Paul Stansifer 已提交
156 157
    log(debug, fmt!("stream_handle_ptr outside interact %?",
        stream_handle_ptr));
158
    do iotask::interact(iotask) |loop_ptr| unsafe {
159
        log(debug, ~"in interact cb for tcp client connect..");
P
Paul Stansifer 已提交
160 161
        log(debug, fmt!("stream_handle_ptr in interact %?",
            stream_handle_ptr));
162
        match uv::ll::tcp_init( loop_ptr, stream_handle_ptr) {
B
Brian Anderson 已提交
163
          0i32 => {
164
            log(debug, ~"tcp_init successful");
165 166 167
            log(debug, ~"dealing w/ ipv4 connection..");
            let connect_req_ptr =
                ptr::addr_of((*socket_data_ptr).connect_req);
K
Kevin Cantu 已提交
168
            let addr_str = ip::format_addr(&input_ip);
169
            let connect_result = match input_ip {
B
Brian Anderson 已提交
170
              ip::Ipv4(addr) => {
171 172 173 174 175 176 177 178 179 180 181 182 183
                // have to "recreate" the sockaddr_in/6
                // since the ip_addr discards the port
                // info.. should probably add an additional
                // rust type that actually is closer to
                // what the libuv API expects (ip str + port num)
                log(debug, fmt!("addr: %?", addr));
                let in_addr = uv::ll::ip4_addr(addr_str, port as int);
                uv::ll::tcp_connect(
                    connect_req_ptr,
                    stream_handle_ptr,
                    ptr::addr_of(in_addr),
                    tcp_connect_on_connect_cb)
              }
B
Brian Anderson 已提交
184
              ip::Ipv6(addr) => {
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
                log(debug, fmt!("addr: %?", addr));
                let in_addr = uv::ll::ip6_addr(addr_str, port as int);
                uv::ll::tcp_connect6(
                    connect_req_ptr,
                    stream_handle_ptr,
                    ptr::addr_of(in_addr),
                    tcp_connect_on_connect_cb)
              }
            };
            match connect_result {
              0i32 => {
                log(debug, ~"tcp_connect successful");
                // reusable data that we'll have for the
                // duration..
                uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                           socket_data_ptr as
                                              *libc::c_void);
                // just so the connect_cb can send the
                // outcome..
                uv::ll::set_data_for_req(connect_req_ptr,
                                         conn_data_ptr);
                log(debug, ~"leaving tcp_connect interact cb...");
                // let tcp_connect_on_connect_cb send on
                // the result_ch, now..
              }
              _ => {
                // immediate connect failure.. probably a garbage
                // ip or somesuch
                let err_data = uv::ll::get_last_err_data(loop_ptr);
                core::comm::send((*conn_data_ptr).result_ch,
B
Brian Anderson 已提交
215
                           ConnFailure(err_data.to_tcp_err()));
216 217 218
                uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                               conn_data_ptr);
                uv::ll::close(stream_handle_ptr, stream_error_close_cb);
J
Jeff Olson 已提交
219 220 221
              }
            }
        }
B
Brian Anderson 已提交
222
          _ => {
J
Jeff Olson 已提交
223 224
            // failure to create a tcp handle
            let err_data = uv::ll::get_last_err_data(loop_ptr);
E
Eric Holk 已提交
225
            core::comm::send((*conn_data_ptr).result_ch,
B
Brian Anderson 已提交
226
                       ConnFailure(err_data.to_tcp_err()));
J
Jeff Olson 已提交
227 228 229
          }
        }
    };
E
Eric Holk 已提交
230
    match core::comm::recv(result_po) {
B
Brian Anderson 已提交
231
      ConnSuccess => {
232
        log(debug, ~"tcp::connect - received success on result_po");
B
Brian Anderson 已提交
233
        result::Ok(TcpSocket(socket_data))
J
Jeff Olson 已提交
234
      }
B
Brian Anderson 已提交
235
      ConnFailure(err_data) => {
E
Eric Holk 已提交
236
        core::comm::recv(closed_signal_po);
237
        log(debug, ~"tcp::connect - received failure on result_po");
238 239 240
        // still have to free the malloc'd stream handle..
        rustrt::rust_uv_current_kernel_free(stream_handle_ptr
                                           as *libc::c_void);
241
        let tcp_conn_err = match err_data.err_name {
B
Brian Anderson 已提交
242 243
          ~"ECONNREFUSED" => ConnectionRefused,
          _ => GenericConnectErr(err_data.err_name, err_data.err_msg)
244
        };
245
        result::Err(tcp_conn_err)
J
Jeff Olson 已提交
246 247 248
      }
    }
}
249

250 251 252 253 254 255
/**
 * Write binary data to a tcp stream; Blocks until operation completes
 *
 * # Arguments
 *
 * * sock - a `tcp_socket` to write to
256
 * * raw_write_data - a vector of `~[u8]` that will be written to the stream.
257 258 259 260 261 262 263
 * This value must remain valid for the duration of the `write` call
 *
 * # Returns
 *
 * A `result` object with a `nil` value as the `ok` variant, or a
 * `tcp_err_data` value as the `err` variant
 */
B
Brian Anderson 已提交
264 265
fn write(sock: TcpSocket, raw_write_data: ~[u8])
    -> result::Result<(), TcpErrData> unsafe {
266
    let socket_data_ptr = ptr::addr_of(*(sock.socket_data));
267 268 269
    write_common_impl(socket_data_ptr, raw_write_data)
}

270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
/**
 * Write binary data to tcp stream; Returns a `future::future` value
 * immediately
 *
 * # Safety
 *
 * This function can produce unsafe results if:
 *
 * 1. the call to `write_future` is made
 * 2. the `future::future` value returned is never resolved via
 * `future::get`
 * 3. and then the `tcp_socket` passed in to `write_future` leaves
 * scope and is destructed before the task that runs the libuv write
 * operation completes.
 *
 * As such: If using `write_future`, always be sure to resolve the returned
 * `future` so as to ensure libuv doesn't try to access a released write
 * handle. Otherwise, use the blocking `tcp::write` function instead.
 *
 * # Arguments
 *
 * * sock - a `tcp_socket` to write to
292
 * * raw_write_data - a vector of `~[u8]` that will be written to the stream.
293 294 295 296 297 298 299 300
 * This value must remain valid for the duration of the `write` call
 *
 * # Returns
 *
 * A `future` value that, once the `write` operation completes, resolves to a
 * `result` object with a `nil` value as the `ok` variant, or a `tcp_err_data`
 * value as the `err` variant
 */
B
Brian Anderson 已提交
301 302
fn write_future(sock: TcpSocket, raw_write_data: ~[u8])
    -> future::Future<result::Result<(), TcpErrData>> unsafe {
303
    let socket_data_ptr = ptr::addr_of(*(sock.socket_data));
304
    do future_spawn {
305
        let data_copy = copy(raw_write_data);
306
        write_common_impl(socket_data_ptr, data_copy)
307
    }
308 309
}

310 311 312 313 314 315 316 317 318 319 320
/**
 * Begin reading binary data from an open TCP connection; used with
 * `read_stop`
 *
 * # Arguments
 *
 * * sock -- a `net::tcp::tcp_socket` for the connection to read from
 *
 * # Returns
 *
 * * A `result` instance that will either contain a
E
Eric Holk 已提交
321 322 323
 * `core::comm::port<tcp_read_result>` that the user can read (and
 * optionally, loop on) from until `read_stop` is called, or a
 * `tcp_err_data` record
324
 */
B
Brian Anderson 已提交
325
fn read_start(sock: TcpSocket)
326
    -> result::Result<comm::Port<
B
Brian Anderson 已提交
327
        result::Result<~[u8], TcpErrData>>, TcpErrData> unsafe {
328
    let socket_data = ptr::addr_of(*(sock.socket_data));
329
    read_start_common_impl(socket_data)
330
}
331

332 333 334 335 336 337 338
/**
 * Stop reading from an open TCP connection; used with `read_start`
 *
 * # Arguments
 *
 * * `sock` - a `net::tcp::tcp_socket` that you wish to stop reading on
 */
B
Brian Anderson 已提交
339 340 341
fn read_stop(sock: TcpSocket,
             -read_port: comm::Port<result::Result<~[u8], TcpErrData>>) ->
    result::Result<(), TcpErrData> unsafe {
P
Paul Stansifer 已提交
342
    log(debug, fmt!("taking the read_port out of commission %?", read_port));
343
    let socket_data = ptr::addr_of(*sock.socket_data);
344 345 346
    read_stop_common_impl(socket_data)
}

347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
/**
 * Reads a single chunk of data from `tcp_socket`; block until data/error
 * recv'd
 *
 * Does a blocking read operation for a single chunk of data from a
 * `tcp_socket` until a data arrives or an error is received. The provided
 * `timeout_msecs` value is used to raise an error if the timeout period
 * passes without any data received.
 *
 * # Arguments
 *
 * * `sock` - a `net::tcp::tcp_socket` that you wish to read from
 * * `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
 * read attempt. Pass `0u` to wait indefinitely
 */
B
Brian Anderson 已提交
362 363
fn read(sock: TcpSocket, timeout_msecs: uint)
    -> result::Result<~[u8],TcpErrData> {
364
    let socket_data = ptr::addr_of(*(sock.socket_data));
365 366 367
    read_common_impl(socket_data, timeout_msecs)
}

368
/**
369
 * Reads a single chunk of data; returns a `future::future<~[u8]>`
370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396
 * immediately
 *
 * Does a non-blocking read operation for a single chunk of data from a
 * `tcp_socket` and immediately returns a `future` value representing the
 * result. When resolving the returned `future`, it will block until data
 * arrives or an error is received. The provided `timeout_msecs`
 * value is used to raise an error if the timeout period passes without any
 * data received.
 *
 * # Safety
 *
 * This function can produce unsafe results if the call to `read_future` is
 * made, the `future::future` value returned is never resolved via
 * `future::get`, and then the `tcp_socket` passed in to `read_future` leaves
 * scope and is destructed before the task that runs the libuv read
 * operation completes.
 *
 * As such: If using `read_future`, always be sure to resolve the returned
 * `future` so as to ensure libuv doesn't try to access a released read
 * handle. Otherwise, use the blocking `tcp::read` function instead.
 *
 * # Arguments
 *
 * * `sock` - a `net::tcp::tcp_socket` that you wish to read from
 * * `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
 * read attempt. Pass `0u` to wait indefinitely
 */
B
Brian Anderson 已提交
397 398
fn read_future(sock: TcpSocket, timeout_msecs: uint)
    -> future::Future<result::Result<~[u8],TcpErrData>> {
399
    let socket_data = ptr::addr_of(*(sock.socket_data));
400
    do future_spawn {
401
        read_common_impl(socket_data, timeout_msecs)
402
    }
403
}
404

405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438
/**
 * Bind an incoming client connection to a `net::tcp::tcp_socket`
 *
 * # Notes
 *
 * It is safe to call `net::tcp::accept` _only_ within the context of the
 * `new_connect_cb` callback provided as the final argument to the
 * `net::tcp::listen` function.
 *
 * The `new_conn` opaque value is provided _only_ as the first argument to the
 * `new_connect_cb` provided as a part of `net::tcp::listen`.
 * It can be safely sent to another task but it _must_ be
 * used (via `net::tcp::accept`) before the `new_connect_cb` call it was
 * provided to returns.
 *
 * This implies that a port/chan pair must be used to make sure that the
 * `new_connect_cb` call blocks until an attempt to create a
 * `net::tcp::tcp_socket` is completed.
 *
 * # Example
 *
 * Here, the `new_conn` is used in conjunction with `accept` from within
 * a task spawned by the `new_connect_cb` passed into `listen`
 *
 * ~~~~~~~~~~~
 * net::tcp::listen(remote_ip, remote_port, backlog)
 *     // this callback is ran once after the connection is successfully
 *     // set up
 *     {|kill_ch|
 *       // pass the kill_ch to your main loop or wherever you want
 *       // to be able to externally kill the server from
 *     }
 *     // this callback is ran when a new connection arrives
 *     {|new_conn, kill_ch|
E
Eric Holk 已提交
439 440
 *     let cont_po = core::comm::port::<option<tcp_err_data>>();
 *     let cont_ch = core::comm::chan(cont_po);
441 442 443
 *     task::spawn {||
 *         let accept_result = net::tcp::accept(new_conn);
 *         if accept_result.is_err() {
E
Eric Holk 已提交
444
 *             core::comm::send(cont_ch, result::get_err(accept_result));
445 446 447 448
 *             // fail?
 *         }
 *         else {
 *             let sock = result::get(accept_result);
E
Eric Holk 已提交
449
 *             core::comm::send(cont_ch, true);
450 451 452
 *             // do work here
 *         }
 *     };
E
Eric Holk 已提交
453
 *     match core::comm::recv(cont_po) {
454
 *       // shut down listen()
E
Eric Holk 已提交
455
 *       some(err_data) { core::comm::send(kill_chan, some(err_data)) }
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473
 *       // wait for next connection
 *       none {}
 *     }
 * };
 * ~~~~~~~~~~~
 *
 * # Arguments
 *
 * * `new_conn` - an opaque value used to create a new `tcp_socket`
 *
 * # Returns
 *
 * On success, this function will return a `net::tcp::tcp_socket` as the
 * `ok` variant of a `result`. The `net::tcp::tcp_socket` is anchored within
 * the task that `accept` was called within for its lifetime. On failure,
 * this function will return a `net::tcp::tcp_err_data` record
 * as the `err` variant of a `result`.
 */
B
Brian Anderson 已提交
474 475
fn accept(new_conn: TcpNewConnection)
    -> result::Result<TcpSocket, TcpErrData> unsafe {
476

477
    match new_conn{
B
Brian Anderson 已提交
478
      NewTcpConn(server_handle_ptr) => {
479
        let server_data_ptr = uv::ll::get_data_for_uv_handle(
B
Brian Anderson 已提交
480
            server_handle_ptr) as *TcpListenFcData;
481
        let reader_po = core::comm::Port();
482
        let iotask = (*server_data_ptr).iotask;
483 484 485
        let stream_handle_ptr = malloc_uv_tcp_t();
        *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
        let client_socket_data = @{
486
            reader_po: reader_po,
487
            reader_ch: core::comm::Chan(reader_po),
488
            stream_handle_ptr : stream_handle_ptr,
489 490
            connect_req : uv::ll::connect_t(),
            write_req : uv::ll::write_t(),
491
            iotask : iotask
492
        };
493 494 495
        let client_socket_data_ptr = ptr::addr_of(*client_socket_data);
        let client_stream_handle_ptr =
            (*client_socket_data_ptr).stream_handle_ptr;
496

B
Brian Anderson 已提交
497
        let result_po = core::comm::Port::<Option<TcpErrData>>();
498
        let result_ch = core::comm::Chan(result_po);
499 500 501 502 503 504 505

        // UNSAFE LIBUV INTERACTION BEGIN
        // .. normally this happens within the context of
        // a call to uv::hl::interact.. but we're breaking
        // the rules here because this always has to be
        // called within the context of a listen() new_connect_cb
        // callback (or it will likely fail and drown your cat)
506
        log(debug, ~"in interact cb for tcp::accept");
507 508
        let loop_ptr = uv::ll::get_loop_for_uv_handle(
            server_handle_ptr);
509
        match uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
B
Brian Anderson 已提交
510
          0i32 => {
511
            log(debug, ~"uv_tcp_init successful for client stream");
512
            match uv::ll::accept(
513 514
                server_handle_ptr as *libc::c_void,
                client_stream_handle_ptr as *libc::c_void) {
B
Brian Anderson 已提交
515
              0i32 => {
516
                log(debug, ~"successfully accepted client connection");
517
                uv::ll::set_data_for_uv_handle(client_stream_handle_ptr,
518 519
                                               client_socket_data_ptr
                                                   as *libc::c_void);
B
Brian Anderson 已提交
520
                core::comm::send(result_ch, None);
521
              }
B
Brian Anderson 已提交
522
              _ => {
523
                log(debug, ~"failed to accept client conn");
B
Brian Anderson 已提交
524
                core::comm::send(result_ch, Some(
525 526 527
                    uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
              }
            }
528
          }
B
Brian Anderson 已提交
529
          _ => {
530
            log(debug, ~"failed to init client stream");
B
Brian Anderson 已提交
531
            core::comm::send(result_ch, Some(
532 533 534 535
                uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
          }
        }
        // UNSAFE LIBUV INTERACTION END
E
Eric Holk 已提交
536
        match core::comm::recv(result_po) {
537
          Some(err_data) => result::Err(err_data),
B
Brian Anderson 已提交
538
          None => result::Ok(TcpSocket(client_socket_data))
539 540 541 542 543
        }
      }
    }
}

544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561
/**
 * Bind to a given IP/port and listen for new connections
 *
 * # Arguments
 *
 * * `host_ip` - a `net::ip::ip_addr` representing a unique IP
 * (versions 4 or 6)
 * * `port` - a uint representing the port to listen on
 * * `backlog` - a uint representing the number of incoming connections
 * to cache in memory
 * * `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on
 * * `on_establish_cb` - a callback that is evaluated if/when the listener
 * is successfully established. it takes no parameters
 * * `new_connect_cb` - a callback to be evaluated, on the libuv thread,
 * whenever a client attempts to conect on the provided ip/port. the
 * callback's arguments are:
 *     * `new_conn` - an opaque type that can be passed to
 *     `net::tcp::accept` in order to be converted to a `tcp_socket`.
E
Eric Holk 已提交
562 563
 *     * `kill_ch` - channel of type `core::comm::chan<option<tcp_err_data>>`.
 *     this channel can be used to send a message to cause `listen` to begin
564 565 566 567 568 569 570 571
 *     closing the underlying libuv data structures.
 *
 * # returns
 *
 * a `result` instance containing empty data of type `()` on a
 * successful/normal shutdown, and a `tcp_listen_err_data` enum in the event
 * of listen exiting because of an error
 */
B
Brian Anderson 已提交
572
fn listen(-host_ip: ip::IpAddr, port: uint, backlog: uint,
B
Brian Anderson 已提交
573
          iotask: IoTask,
B
Brian Anderson 已提交
574 575 576 577
          on_establish_cb: fn~(comm::Chan<Option<TcpErrData>>),
          +new_connect_cb: fn~(TcpNewConnection,
                               comm::Chan<Option<TcpErrData>>))
    -> result::Result<(), TcpListenErrData> unsafe {
578
    do listen_common(host_ip, port, backlog, iotask, on_establish_cb)
579
        // on_connect_cb
580
        |move new_connect_cb, handle| unsafe {
581
            let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
B
Brian Anderson 已提交
582 583
                as *TcpListenFcData;
            let new_conn = NewTcpConn(handle);
584 585 586 587 588
            let kill_ch = (*server_data_ptr).kill_ch;
            new_connect_cb(new_conn, kill_ch);
    }
}

B
Brian Anderson 已提交
589
fn listen_common(-host_ip: ip::IpAddr, port: uint, backlog: uint,
B
Brian Anderson 已提交
590
          iotask: IoTask,
B
Brian Anderson 已提交
591
          on_establish_cb: fn~(comm::Chan<Option<TcpErrData>>),
592
          -on_connect_cb: fn~(*uv::ll::uv_tcp_t))
B
Brian Anderson 已提交
593
    -> result::Result<(), TcpListenErrData> unsafe {
594
    let stream_closed_po = core::comm::Port::<()>();
B
Brian Anderson 已提交
595
    let kill_po = core::comm::Port::<Option<TcpErrData>>();
596
    let kill_ch = core::comm::Chan(kill_po);
597 598 599 600
    let server_stream = uv::ll::tcp_t();
    let server_stream_ptr = ptr::addr_of(server_stream);
    let server_data = {
        server_stream_ptr: server_stream_ptr,
601
        stream_closed_ch: core::comm::Chan(stream_closed_po),
602
        kill_ch: kill_ch,
603
        on_connect_cb: on_connect_cb,
604
        iotask: iotask,
605 606 607 608
        mut active: true
    };
    let server_data_ptr = ptr::addr_of(server_data);

E
Eric Holk 已提交
609
    let setup_result = do core::comm::listen |setup_ch| {
J
Jeff Olson 已提交
610
        // this is to address a compiler warning about
611 612 613 614
        // an implicit copy.. it seems that double nested
        // will defeat a move sigil, as is done to the host_ip
        // arg above.. this same pattern works w/o complaint in
        // tcp::connect (because the iotask::interact cb isn't
E
Eric Holk 已提交
615
        // nested within a core::comm::listen block)
616
        let loc_ip = copy(host_ip);
617
        do iotask::interact(iotask) |loop_ptr| unsafe {
618
            match uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
B
Brian Anderson 已提交
619
              0i32 => {
620 621 622
                uv::ll::set_data_for_uv_handle(
                    server_stream_ptr,
                    server_data_ptr);
K
Kevin Cantu 已提交
623
                let addr_str = ip::format_addr(&loc_ip);
624
                let bind_result = match loc_ip {
B
Brian Anderson 已提交
625
                  ip::Ipv4(addr) => {
P
Paul Stansifer 已提交
626
                    log(debug, fmt!("addr: %?", addr));
627 628 629 630
                    let in_addr = uv::ll::ip4_addr(addr_str, port as int);
                    uv::ll::tcp_bind(server_stream_ptr,
                                     ptr::addr_of(in_addr))
                  }
B
Brian Anderson 已提交
631
                  ip::Ipv6(addr) => {
P
Paul Stansifer 已提交
632
                    log(debug, fmt!("addr: %?", addr));
633 634 635 636 637
                    let in_addr = uv::ll::ip6_addr(addr_str, port as int);
                    uv::ll::tcp_bind6(server_stream_ptr,
                                     ptr::addr_of(in_addr))
                  }
                };
638
                match bind_result {
B
Brian Anderson 已提交
639
                  0i32 => {
640
                    match uv::ll::listen(server_stream_ptr,
641 642
                                       backlog as libc::c_int,
                                       tcp_lfc_on_connection_cb) {
B
Brian Anderson 已提交
643
                      0i32 => core::comm::send(setup_ch, None),
B
Brian Anderson 已提交
644
                      _ => {
645
                        log(debug, ~"failure to uv_listen()");
646
                        let err_data = uv::ll::get_last_err_data(loop_ptr);
B
Brian Anderson 已提交
647
                        core::comm::send(setup_ch, Some(err_data));
648 649
                      }
                    }
650
                  }
B
Brian Anderson 已提交
651
                  _ => {
652
                    log(debug, ~"failure to uv_tcp_bind");
653
                    let err_data = uv::ll::get_last_err_data(loop_ptr);
B
Brian Anderson 已提交
654
                    core::comm::send(setup_ch, Some(err_data));
655 656 657
                  }
                }
              }
B
Brian Anderson 已提交
658
              _ => {
659
                log(debug, ~"failure to uv_tcp_init");
660
                let err_data = uv::ll::get_last_err_data(loop_ptr);
B
Brian Anderson 已提交
661
                core::comm::send(setup_ch, Some(err_data));
662 663
              }
            }
664 665
        };
        setup_ch.recv()
666
    };
667
    match setup_result {
B
Brian Anderson 已提交
668
      Some(err_data) => {
669
        do iotask::interact(iotask) |loop_ptr| unsafe {
P
Paul Stansifer 已提交
670 671
            log(debug, fmt!("tcp::listen post-kill recv hl interact %?",
                            loop_ptr));
672 673 674 675
            (*server_data_ptr).active = false;
            uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
        };
        stream_closed_po.recv();
676
        match err_data.err_name {
B
Brian Anderson 已提交
677
          ~"EACCES" => {
678
            log(debug, ~"Got EACCES error");
B
Brian Anderson 已提交
679
            result::Err(AccessDenied)
680
          }
B
Brian Anderson 已提交
681
          ~"EADDRINUSE" => {
682
            log(debug, ~"Got EADDRINUSE error");
B
Brian Anderson 已提交
683
            result::Err(AddressInUse)
684
          }
B
Brian Anderson 已提交
685
          _ => {
P
Paul Stansifer 已提交
686 687
            log(debug, fmt!("Got '%s' '%s' libuv error",
                            err_data.err_name, err_data.err_msg));
688
            result::Err(
B
Brian Anderson 已提交
689
                GenericListenErr(err_data.err_name, err_data.err_msg))
690 691 692
          }
        }
      }
B
Brian Anderson 已提交
693
      None => {
694
        on_establish_cb(kill_ch);
E
Eric Holk 已提交
695
        let kill_result = core::comm::recv(kill_po);
696
        do iotask::interact(iotask) |loop_ptr| unsafe {
P
Paul Stansifer 已提交
697 698
            log(debug, fmt!("tcp::listen post-kill recv hl interact %?",
                            loop_ptr));
699 700 701
            (*server_data_ptr).active = false;
            uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
        };
702
        stream_closed_po.recv();
703
        match kill_result {
704
          // some failure post bind/listen
B
Brian Anderson 已提交
705
          Some(err_data) => result::Err(GenericListenErr(err_data.err_name,
B
Brian Anderson 已提交
706
                                                           err_data.err_msg)),
707
          // clean exit
708
          None => result::Ok(())
709 710 711 712 713
        }
      }
    }
}

714 715 716 717 718 719 720 721 722 723 724 725 726 727 728
/**
 * Convert a `net::tcp::tcp_socket` to a `net::tcp::tcp_socket_buf`.
 *
 * This function takes ownership of a `net::tcp::tcp_socket`, returning it
 * stored within a buffered wrapper, which can be converted to a `io::reader`
 * or `io::writer`
 *
 * # Arguments
 *
 * * `sock` -- a `net::tcp::tcp_socket` that you want to buffer
 *
 * # Returns
 *
 * A buffered wrapper that you can cast as an `io::reader` or `io::writer`
 */
B
Brian Anderson 已提交
729
fn socket_buf(-sock: TcpSocket) -> TcpSocketBuf {
730
    TcpSocketBuf(@{ sock: move sock, mut buf: ~[] })
731 732
}

733
/// Convenience methods extending `net::tcp::tcp_socket`
B
Brian Anderson 已提交
734
impl TcpSocket {
735
    fn read_start() -> result::Result<comm::Port<
B
Brian Anderson 已提交
736
        result::Result<~[u8], TcpErrData>>, TcpErrData> {
737 738
        read_start(self)
    }
739
    fn read_stop(-read_port:
B
Brian Anderson 已提交
740 741
                 comm::Port<result::Result<~[u8], TcpErrData>>) ->
        result::Result<(), TcpErrData> {
742
        read_stop(self, read_port)
743
    }
744
    fn read(timeout_msecs: uint) ->
B
Brian Anderson 已提交
745
        result::Result<~[u8], TcpErrData> {
746 747 748
        read(self, timeout_msecs)
    }
    fn read_future(timeout_msecs: uint) ->
B
Brian Anderson 已提交
749
        future::Future<result::Result<~[u8], TcpErrData>> {
750 751
        read_future(self, timeout_msecs)
    }
752
    fn write(raw_write_data: ~[u8])
B
Brian Anderson 已提交
753
        -> result::Result<(), TcpErrData> {
754 755
        write(self, raw_write_data)
    }
756
    fn write_future(raw_write_data: ~[u8])
B
Brian Anderson 已提交
757
        -> future::Future<result::Result<(), TcpErrData>> {
758 759
        write_future(self, raw_write_data)
    }
760
}
761

762
/// Implementation of `io::reader` trait for a buffered `net::tcp::tcp_socket`
B
Brian Anderson 已提交
763
impl TcpSocketBuf: io::Reader {
764 765 766 767
    fn read(buf: &[mut u8], len: uint) -> uint {
        // Loop until our buffer has enough data in it for us to read from.
        while self.data.buf.len() < len {
            let read_result = read(self.data.sock, 0u);
768
            if read_result.is_err() {
769
                let err_data = read_result.get_err();
770 771 772 773

                if err_data.err_name == ~"EOF" {
                    break;
                } else {
P
Paul Stansifer 已提交
774 775
                    debug!("ERROR sock_buf as io::reader.read err %? %?",
                           err_data.err_name, err_data.err_msg);
776

B
Brian Anderson 已提交
777
                    return 0;
778
                }
779 780
            }
            else {
781
                vec::push_all(self.data.buf, result::unwrap(read_result));
782 783
            }
        }
784

785
        let count = uint::min(len, self.data.buf.len());
786 787 788 789 790 791 792 793 794

        let mut data = ~[];
        self.data.buf <-> data;

        vec::u8::memcpy(buf, vec::view(data, 0, data.len()), count);

        vec::push_all(self.data.buf, vec::view(data, count, data.len()));

        count
795 796
    }
    fn read_byte() -> int {
797 798
        let bytes = ~[0];
        if self.read(bytes, 1u) == 0 { fail } else { bytes[0] as int }
799 800
    }
    fn unread_byte(amt: int) {
801
        vec::unshift((*(self.data)).buf, amt as u8);
802 803 804 805
    }
    fn eof() -> bool {
        false // noop
    }
806
    fn seek(dist: int, seek: io::SeekStyle) {
P
Paul Stansifer 已提交
807
        log(debug, fmt!("tcp_socket_buf seek stub %? %?", dist, seek));
808 809 810 811 812 813 814
        // noop
    }
    fn tell() -> uint {
        0u // noop
    }
}

815
/// Implementation of `io::reader` trait for a buffered `net::tcp::tcp_socket`
B
Brian Anderson 已提交
816
impl TcpSocketBuf: io::Writer {
817
    fn write(data: &[const u8]) unsafe {
818 819
        let socket_data_ptr =
            ptr::addr_of(*((*(self.data)).sock).socket_data);
820 821
        let w_result = write_common_impl(socket_data_ptr,
                                        vec::slice(data, 0, vec::len(data)));
822
        if w_result.is_err() {
823
            let err_data = w_result.get_err();
P
Paul Stansifer 已提交
824 825
            log(debug, fmt!("ERROR sock_buf as io::writer.writer err: %? %?",
                             err_data.err_name, err_data.err_msg));
826
        }
827
    }
828
    fn seek(dist: int, seek: io::SeekStyle) {
P
Paul Stansifer 已提交
829
      log(debug, fmt!("tcp_socket_buf seek stub %? %?", dist, seek));
830 831 832 833 834 835 836 837
        // noop
    }
    fn tell() -> uint {
        0u
    }
    fn flush() -> int {
        0
    }
838 839
    fn get_type() -> io::WriterType {
        io::File
840
    }
841 842
}

843 844
// INTERNAL API

B
Brian Anderson 已提交
845
fn tear_down_socket_data(socket_data: @TcpSocketData) unsafe {
846 847
    let closed_po = core::comm::Port::<()>();
    let closed_ch = core::comm::Chan(closed_po);
848 849 850 851 852
    let close_data = {
        closed_ch: closed_ch
    };
    let close_data_ptr = ptr::addr_of(close_data);
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
853
    do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe {
P
Paul Stansifer 已提交
854 855
        log(debug, fmt!("interact dtor for tcp_socket stream %? loop %?",
            stream_handle_ptr, loop_ptr));
856 857 858 859
        uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                       close_data_ptr);
        uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb);
    };
E
Eric Holk 已提交
860
    core::comm::recv(closed_po);
P
Paul Stansifer 已提交
861
    log(debug, fmt!("about to free socket_data at %?", socket_data));
862 863
    rustrt::rust_uv_current_kernel_free(stream_handle_ptr
                                       as *libc::c_void);
864
    log(debug, ~"exiting dtor for tcp_socket");
865 866
}

867
// shared implementation for tcp::read
B
Brian Anderson 已提交
868 869
fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint)
    -> result::Result<~[u8],TcpErrData> unsafe {
870
    log(debug, ~"starting tcp::read");
871
    let iotask = (*socket_data).iotask;
872
    let rs_result = read_start_common_impl(socket_data);
873
    if result::is_err(rs_result) {
874
        let err_data = result::get_err(rs_result);
875
        result::Err(err_data)
876 877
    }
    else {
878
        log(debug, ~"tcp::read before recv_timeout");
879 880
        let read_result = if timeout_msecs > 0u {
            timer::recv_timeout(
881
               iotask, timeout_msecs, result::get(rs_result))
882
        } else {
B
Brian Anderson 已提交
883
            Some(core::comm::recv(result::get(rs_result)))
884
        };
885
        log(debug, ~"tcp::read after recv_timeout");
886
        match read_result {
B
Brian Anderson 已提交
887
          None => {
888
            log(debug, ~"tcp::read: timed out..");
889
            let err_data = {
890 891
                err_name: ~"TIMEOUT",
                err_msg: ~"req timed out"
892 893
            };
            read_stop_common_impl(socket_data);
894
            result::Err(err_data)
895
          }
B
Brian Anderson 已提交
896
          Some(data_result) => {
897
            log(debug, ~"tcp::read got data");
898 899 900 901 902 903 904 905
            read_stop_common_impl(socket_data);
            data_result
          }
        }
    }
}

// shared impl for read_stop
B
Brian Anderson 已提交
906 907
fn read_stop_common_impl(socket_data: *TcpSocketData) ->
    result::Result<(), TcpErrData> unsafe {
908
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
B
Brian Anderson 已提交
909
    let stop_po = core::comm::Port::<Option<TcpErrData>>();
910
    let stop_ch = core::comm::Chan(stop_po);
911
    do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe {
912
        log(debug, ~"in interact cb for tcp::read_stop");
913
        match uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
B
Brian Anderson 已提交
914
          0i32 => {
915
            log(debug, ~"successfully called uv_read_stop");
B
Brian Anderson 已提交
916
            core::comm::send(stop_ch, None);
917
          }
B
Brian Anderson 已提交
918
          _ => {
919
            log(debug, ~"failure in calling uv_read_stop");
920
            let err_data = uv::ll::get_last_err_data(loop_ptr);
B
Brian Anderson 已提交
921
            core::comm::send(stop_ch, Some(err_data.to_tcp_err()));
922 923 924
          }
        }
    };
E
Eric Holk 已提交
925
    match core::comm::recv(stop_po) {
926 927
      Some(err_data) => result::Err(err_data.to_tcp_err()),
      None => result::Ok(())
928 929 930 931
    }
}

// shared impl for read_start
B
Brian Anderson 已提交
932
fn read_start_common_impl(socket_data: *TcpSocketData)
933
    -> result::Result<comm::Port<
B
Brian Anderson 已提交
934
        result::Result<~[u8], TcpErrData>>, TcpErrData> unsafe {
935
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
936 937
    let start_po = core::comm::Port::<Option<uv::ll::uv_err_data>>();
    let start_ch = core::comm::Chan(start_po);
938
    log(debug, ~"in tcp::read_start before interact loop");
939
    do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe {
P
Paul Stansifer 已提交
940
        log(debug, fmt!("in tcp::read_start interact cb %?", loop_ptr));
941
        match uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
942 943
                               on_alloc_cb,
                               on_tcp_read_cb) {
B
Brian Anderson 已提交
944
          0i32 => {
945
            log(debug, ~"success doing uv_read_start");
B
Brian Anderson 已提交
946
            core::comm::send(start_ch, None);
947
          }
B
Brian Anderson 已提交
948
          _ => {
949
            log(debug, ~"error attempting uv_read_start");
950
            let err_data = uv::ll::get_last_err_data(loop_ptr);
B
Brian Anderson 已提交
951
            core::comm::send(start_ch, Some(err_data));
952 953 954
          }
        }
    };
E
Eric Holk 已提交
955
    match core::comm::recv(start_po) {
956 957
      Some(err_data) => result::Err(err_data.to_tcp_err()),
      None => result::Ok((*socket_data).reader_po)
958 959 960
    }
}

961 962
// helper to convert a "class" vector of [u8] to a *[uv::ll::uv_buf_t]

963
// shared implementation used by write and write_future
B
Brian Anderson 已提交
964
fn write_common_impl(socket_data_ptr: *TcpSocketData,
965
                     raw_write_data: ~[u8])
B
Brian Anderson 已提交
966
    -> result::Result<(), TcpErrData> unsafe {
967 968 969
    let write_req_ptr = ptr::addr_of((*socket_data_ptr).write_req);
    let stream_handle_ptr =
        (*socket_data_ptr).stream_handle_ptr;
970
    let write_buf_vec =  ~[ uv::ll::buf_init(
971
        vec::unsafe::to_ptr(raw_write_data),
972
        vec::len(raw_write_data)) ];
973
    let write_buf_vec_ptr = ptr::addr_of(write_buf_vec);
B
Brian Anderson 已提交
974
    let result_po = core::comm::Port::<TcpWriteResult>();
975
    let write_data = {
976
        result_ch: core::comm::Chan(result_po)
977 978
    };
    let write_data_ptr = ptr::addr_of(write_data);
979
    do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| unsafe {
P
Paul Stansifer 已提交
980
        log(debug, fmt!("in interact cb for tcp::write %?", loop_ptr));
981
        match uv::ll::write(write_req_ptr,
982 983 984
                          stream_handle_ptr,
                          write_buf_vec_ptr,
                          tcp_write_complete_cb) {
B
Brian Anderson 已提交
985
          0i32 => {
986
            log(debug, ~"uv_write() invoked successfully");
987 988
            uv::ll::set_data_for_req(write_req_ptr, write_data_ptr);
          }
B
Brian Anderson 已提交
989
          _ => {
990
            log(debug, ~"error invoking uv_write()");
991
            let err_data = uv::ll::get_last_err_data(loop_ptr);
E
Eric Holk 已提交
992
            core::comm::send((*write_data_ptr).result_ch,
B
Brian Anderson 已提交
993
                       TcpWriteError(err_data.to_tcp_err()));
994 995 996
          }
        }
    };
997 998 999 1000
    // FIXME (#2656): Instead of passing unsafe pointers to local data,
    // and waiting here for the write to complete, we should transfer
    // ownership of everything to the I/O task and let it deal with the
    // aftermath, so we don't have to sit here blocking.
E
Eric Holk 已提交
1001
    match core::comm::recv(result_po) {
B
Brian Anderson 已提交
1002 1003
      TcpWriteSuccess => result::Ok(()),
      TcpWriteError(err_data) => result::Err(err_data.to_tcp_err())
1004 1005 1006
    }
}

B
Brian Anderson 已提交
1007 1008
enum TcpNewConnection {
    NewTcpConn(*uv::ll::uv_tcp_t)
1009 1010
}

B
Brian Anderson 已提交
1011
type TcpListenFcData = {
1012
    server_stream_ptr: *uv::ll::uv_tcp_t,
1013
    stream_closed_ch: comm::Chan<()>,
B
Brian Anderson 已提交
1014
    kill_ch: comm::Chan<Option<TcpErrData>>,
1015
    on_connect_cb: fn~(*uv::ll::uv_tcp_t),
B
Brian Anderson 已提交
1016
    iotask: IoTask,
1017 1018 1019
    mut active: bool
};

G
Graydon Hoare 已提交
1020
extern fn tcp_lfc_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
1021
    let server_data_ptr = uv::ll::get_data_for_uv_handle(
B
Brian Anderson 已提交
1022
        handle) as *TcpListenFcData;
E
Eric Holk 已提交
1023
    core::comm::send((*server_data_ptr).stream_closed_ch, ());
1024 1025
}

G
Graydon Hoare 已提交
1026
extern fn tcp_lfc_on_connection_cb(handle: *uv::ll::uv_tcp_t,
1027 1028
                                     status: libc::c_int) unsafe {
    let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
B
Brian Anderson 已提交
1029
        as *TcpListenFcData;
1030
    let kill_ch = (*server_data_ptr).kill_ch;
1031
    if (*server_data_ptr).active {
1032
        match status {
B
Brian Anderson 已提交
1033 1034
          0i32 => (*server_data_ptr).on_connect_cb(handle),
          _ => {
1035
            let loop_ptr = uv::ll::get_loop_for_uv_handle(handle);
E
Eric Holk 已提交
1036
            core::comm::send(kill_ch,
B
Brian Anderson 已提交
1037
                       Some(uv::ll::get_last_err_data(loop_ptr)
1038 1039 1040 1041 1042 1043 1044
                            .to_tcp_err()));
            (*server_data_ptr).active = false;
          }
        }
    }
}

1045
fn malloc_uv_tcp_t() -> *uv::ll::uv_tcp_t unsafe {
1046
    rustrt::rust_uv_current_kernel_malloc(
1047
        rustrt::rust_uv_helper_uv_tcp_t_size()) as *uv::ll::uv_tcp_t
1048 1049
}

B
Brian Anderson 已提交
1050 1051 1052
enum TcpConnectResult {
    TcpConnected(TcpSocket),
    TcpConnectError(TcpErrData)
1053 1054
}

B
Brian Anderson 已提交
1055 1056 1057
enum TcpWriteResult {
    TcpWriteSuccess,
    TcpWriteError(TcpErrData)
1058 1059
}

B
Brian Anderson 已提交
1060 1061 1062
enum TcpReadStartResult {
    TcpReadStartSuccess(comm::Port<TcpReadResult>),
    TcpReadStartError(TcpErrData)
1063 1064
}

B
Brian Anderson 已提交
1065 1066 1067 1068
enum TcpReadResult {
    TcpReadData(~[u8]),
    TcpReadDone,
    TcpReadErr(TcpErrData)
1069 1070
}

B
Brian Anderson 已提交
1071 1072
trait ToTcpErr {
    fn to_tcp_err() -> TcpErrData;
1073 1074
}

B
Brian Anderson 已提交
1075 1076
impl uv::ll::uv_err_data: ToTcpErr {
    fn to_tcp_err() -> TcpErrData {
1077 1078 1079 1080
        { err_name: self.err_name, err_msg: self.err_msg }
    }
}

G
Graydon Hoare 已提交
1081
extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
1082 1083
                    nread: libc::ssize_t,
                    ++buf: uv::ll::uv_buf_t) unsafe {
P
Paul Stansifer 已提交
1084 1085
    log(debug, fmt!("entering on_tcp_read_cb stream: %? nread: %?",
                    stream, nread));
1086 1087
    let loop_ptr = uv::ll::get_loop_for_uv_handle(stream);
    let socket_data_ptr = uv::ll::get_data_for_uv_handle(stream)
B
Brian Anderson 已提交
1088
        as *TcpSocketData;
1089
    match nread as int {
1090
      // incoming err.. probably eof
B
Brian Anderson 已提交
1091
      -1 => {
1092
        let err_data = uv::ll::get_last_err_data(loop_ptr).to_tcp_err();
P
Paul Stansifer 已提交
1093 1094
        log(debug, fmt!("on_tcp_read_cb: incoming err.. name %? msg %?",
                        err_data.err_name, err_data.err_msg));
1095
        let reader_ch = (*socket_data_ptr).reader_ch;
1096
        core::comm::send(reader_ch, result::Err(err_data));
1097 1098
      }
      // do nothing .. unneeded buf
B
Brian Anderson 已提交
1099
      0 => (),
1100
      // have data
B
Brian Anderson 已提交
1101
      _ => {
1102
        // we have data
P
Paul Stansifer 已提交
1103
        log(debug, fmt!("tcp on_read_cb nread: %d", nread as int));
1104
        let reader_ch = (*socket_data_ptr).reader_ch;
1105
        let buf_base = uv::ll::get_base_from_buf(buf);
1106
        let new_bytes = vec::unsafe::from_buf(buf_base, nread as uint);
1107
        core::comm::send(reader_ch, result::Ok(new_bytes));
1108 1109 1110
      }
    }
    uv::ll::free_base_of_buf(buf);
1111
    log(debug, ~"exiting on_tcp_read_cb");
1112 1113
}

G
Graydon Hoare 已提交
1114
extern fn on_alloc_cb(handle: *libc::c_void,
1115
                     ++suggested_size: size_t)
1116
    -> uv::ll::uv_buf_t unsafe {
1117
    log(debug, ~"tcp read on_alloc_cb!");
1118
    let char_ptr = uv::ll::malloc_buf_base_of(suggested_size);
P
Paul Stansifer 已提交
1119
    log(debug, fmt!("tcp read on_alloc_cb h: %? char_ptr: %u sugsize: %u",
1120 1121
                     handle,
                     char_ptr as uint,
P
Paul Stansifer 已提交
1122
                     suggested_size as uint));
1123
    uv::ll::buf_init(char_ptr, suggested_size as uint)
1124
}
1125

B
Brian Anderson 已提交
1126
type TcpSocketCloseData = {
1127
    closed_ch: comm::Chan<()>
1128 1129
};

G
Graydon Hoare 已提交
1130
extern fn tcp_socket_dtor_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
1131
    let data = uv::ll::get_data_for_uv_handle(handle)
B
Brian Anderson 已提交
1132
        as *TcpSocketCloseData;
1133
    let closed_ch = (*data).closed_ch;
E
Eric Holk 已提交
1134
    core::comm::send(closed_ch, ());
1135
    log(debug, ~"tcp_socket_dtor_close_cb exiting..");
1136 1137
}

G
Graydon Hoare 已提交
1138
extern fn tcp_write_complete_cb(write_req: *uv::ll::uv_write_t,
1139 1140
                              status: libc::c_int) unsafe {
    let write_data_ptr = uv::ll::get_data_for_req(write_req)
B
Brian Anderson 已提交
1141
        as *WriteReqData;
1142
    if status == 0i32 {
1143
        log(debug, ~"successful write complete");
B
Brian Anderson 已提交
1144
        core::comm::send((*write_data_ptr).result_ch, TcpWriteSuccess);
1145
    } else {
1146 1147 1148 1149
        let stream_handle_ptr = uv::ll::get_stream_handle_from_write_req(
            write_req);
        let loop_ptr = uv::ll::get_loop_for_uv_handle(stream_handle_ptr);
        let err_data = uv::ll::get_last_err_data(loop_ptr);
1150
        log(debug, ~"failure to write");
E
Eric Holk 已提交
1151
        core::comm::send((*write_data_ptr).result_ch,
B
Brian Anderson 已提交
1152
                         TcpWriteError(err_data));
1153 1154 1155
    }
}

B
Brian Anderson 已提交
1156 1157
type WriteReqData = {
    result_ch: comm::Chan<TcpWriteResult>
1158 1159
};

B
Brian Anderson 已提交
1160 1161
type ConnectReqData = {
    result_ch: comm::Chan<ConnAttempt>,
1162
    closed_signal_ch: comm::Chan<()>
J
Jeff Olson 已提交
1163 1164
};

G
Graydon Hoare 已提交
1165
extern fn stream_error_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
J
Jeff Olson 已提交
1166
    let data = uv::ll::get_data_for_uv_handle(handle) as
B
Brian Anderson 已提交
1167
        *ConnectReqData;
E
Eric Holk 已提交
1168
    core::comm::send((*data).closed_signal_ch, ());
P
Paul Stansifer 已提交
1169
    log(debug, fmt!("exiting steam_error_close_cb for %?", handle));
J
Jeff Olson 已提交
1170 1171
}

G
Graydon Hoare 已提交
1172
extern fn tcp_connect_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
P
Paul Stansifer 已提交
1173
    log(debug, fmt!("closed client tcp handle %?", handle));
J
Jeff Olson 已提交
1174 1175
}

G
Graydon Hoare 已提交
1176
extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t,
J
Jeff Olson 已提交
1177 1178
                                   status: libc::c_int) unsafe {
    let conn_data_ptr = (uv::ll::get_data_for_req(connect_req_ptr)
B
Brian Anderson 已提交
1179
                      as *ConnectReqData);
J
Jeff Olson 已提交
1180
    let result_ch = (*conn_data_ptr).result_ch;
P
Paul Stansifer 已提交
1181
    log(debug, fmt!("tcp_connect result_ch %?", result_ch));
J
Jeff Olson 已提交
1182 1183
    let tcp_stream_ptr =
        uv::ll::get_stream_handle_from_connect_req(connect_req_ptr);
1184
    match status {
B
Brian Anderson 已提交
1185
      0i32 => {
1186
        log(debug, ~"successful tcp connection!");
B
Brian Anderson 已提交
1187
        core::comm::send(result_ch, ConnSuccess);
J
Jeff Olson 已提交
1188
      }
B
Brian Anderson 已提交
1189
      _ => {
1190
        log(debug, ~"error in tcp_connect_on_connect_cb");
J
Jeff Olson 已提交
1191 1192
        let loop_ptr = uv::ll::get_loop_for_uv_handle(tcp_stream_ptr);
        let err_data = uv::ll::get_last_err_data(loop_ptr);
P
Paul Stansifer 已提交
1193 1194
        log(debug, fmt!("err_data %? %?", err_data.err_name,
                        err_data.err_msg));
B
Brian Anderson 已提交
1195
        core::comm::send(result_ch, ConnFailure(err_data));
J
Jeff Olson 已提交
1196 1197 1198 1199 1200
        uv::ll::set_data_for_uv_handle(tcp_stream_ptr,
                                       conn_data_ptr);
        uv::ll::close(tcp_stream_ptr, stream_error_close_cb);
      }
    }
1201
    log(debug, ~"leaving tcp_connect_on_connect_cb");
J
Jeff Olson 已提交
1202 1203
}

B
Brian Anderson 已提交
1204 1205 1206
enum ConnAttempt {
    ConnSuccess,
    ConnFailure(uv::ll::uv_err_data)
J
Jeff Olson 已提交
1207 1208
}

B
Brian Anderson 已提交
1209 1210 1211
type TcpSocketData = {
    reader_po: comm::Port<result::Result<~[u8], TcpErrData>>,
    reader_ch: comm::Chan<result::Result<~[u8], TcpErrData>>,
1212
    stream_handle_ptr: *uv::ll::uv_tcp_t,
J
Jeff Olson 已提交
1213
    connect_req: uv::ll::uv_connect_t,
1214
    write_req: uv::ll::uv_write_t,
B
Brian Anderson 已提交
1215
    iotask: IoTask
J
Jeff Olson 已提交
1216 1217
};

B
Brian Anderson 已提交
1218 1219
type TcpBufferedSocketData = {
    sock: TcpSocket,
1220
    mut buf: ~[u8]
1221
};
J
Jeff Olson 已提交
1222

1223
//#[cfg(test)]
1224
mod test {
1225
    // FIXME don't run on fbsd or linux 32 bit (#2064)
1226 1227 1228 1229 1230 1231 1232 1233 1234 1235
    #[cfg(target_os="win32")]
    #[cfg(target_os="darwin")]
    #[cfg(target_os="linux")]
    mod tcp_ipv4_server_and_client_test {
        #[cfg(target_arch="x86_64")]
        mod impl64 {
            #[test]
            fn test_gl_tcp_server_and_client_ipv4() unsafe {
                impl_gl_tcp_ipv4_server_and_client();
            }
1236
            #[test]
1237 1238 1239
            fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
                impl_gl_tcp_ipv4_client_error_connection_refused();
            }
1240 1241 1242 1243 1244 1245 1246
            #[test]
            fn test_gl_tcp_server_address_in_use() unsafe {
                impl_gl_tcp_ipv4_server_address_in_use();
            }
            #[test]
            fn test_gl_tcp_server_access_denied() unsafe {
                impl_gl_tcp_ipv4_server_access_denied();
1247
            }
1248 1249 1250 1251
            #[test]
            fn test_gl_tcp_ipv4_server_client_reader_writer() {
                impl_gl_tcp_ipv4_server_client_reader_writer();
            }
1252

1253 1254 1255 1256 1257 1258 1259 1260
        }
        #[cfg(target_arch="x86")]
        mod impl32 {
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_server_and_client_ipv4() unsafe {
                impl_gl_tcp_ipv4_server_and_client();
            }
1261 1262
            #[test]
            #[ignore(cfg(target_os = "linux"))]
1263 1264 1265
            fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
                impl_gl_tcp_ipv4_client_error_connection_refused();
            }
1266 1267 1268 1269 1270 1271 1272
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_server_address_in_use() unsafe {
                impl_gl_tcp_ipv4_server_address_in_use();
            }
            #[test]
            #[ignore(cfg(target_os = "linux"))]
1273
            #[ignore(cfg(windows), reason = "deadlocking bots")]
1274 1275 1276
            fn test_gl_tcp_server_access_denied() unsafe {
                impl_gl_tcp_ipv4_server_access_denied();
            }
1277 1278 1279 1280
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_ipv4_server_client_reader_writer() {
                impl_gl_tcp_ipv4_server_client_reader_writer();
1281
            }
1282
        }
1283
    }
1284
    fn impl_gl_tcp_ipv4_server_and_client() {
1285
        let hl_loop = uv::global_loop::get();
1286
        let server_ip = ~"127.0.0.1";
1287
        let server_port = 8888u;
1288 1289
        let expected_req = ~"ping";
        let expected_resp = ~"pong";
1290

1291 1292
        let server_result_po = core::comm::Port::<~str>();
        let server_result_ch = core::comm::Chan(server_result_po);
1293

1294 1295
        let cont_po = core::comm::Port::<()>();
        let cont_ch = core::comm::Chan(cont_po);
1296
        // server
1297
        do task::spawn_sched(task::ManualThreads(1u)) {
B
Brian Anderson 已提交
1298
            let actual_req = do comm::listen |server_ch| {
1299 1300 1301 1302
                run_tcp_test_server(
                    server_ip,
                    server_port,
                    expected_resp,
1303
                    server_ch,
1304 1305
                    cont_ch,
                    hl_loop)
1306 1307 1308
            };
            server_result_ch.send(actual_req);
        };
E
Eric Holk 已提交
1309
        core::comm::recv(cont_po);
1310
        // client
1311
        log(debug, ~"server started, firing up client..");
E
Eric Holk 已提交
1312
        let actual_resp_result = do core::comm::listen |client_ch| {
1313 1314 1315 1316
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
1317 1318
                client_ch,
                hl_loop)
1319
        };
1320
        assert actual_resp_result.is_ok();
1321
        let actual_resp = actual_resp_result.get();
E
Eric Holk 已提交
1322
        let actual_req = core::comm::recv(server_result_po);
P
Paul Stansifer 已提交
1323 1324 1325 1326
        log(debug, fmt!("REQ: expected: '%s' actual: '%s'",
                       expected_req, actual_req));
        log(debug, fmt!("RESP: expected: '%s' actual: '%s'",
                       expected_resp, actual_resp));
1327 1328
        assert str::contains(actual_req, expected_req);
        assert str::contains(actual_resp, expected_resp);
1329
    }
1330
    fn impl_gl_tcp_ipv4_client_error_connection_refused() {
1331
        let hl_loop = uv::global_loop::get();
1332
        let server_ip = ~"127.0.0.1";
1333
        let server_port = 8889u;
1334
        let expected_req = ~"ping";
1335
        // client
1336
        log(debug, ~"firing up client..");
E
Eric Holk 已提交
1337
        let actual_resp_result = do core::comm::listen |client_ch| {
1338 1339 1340 1341 1342 1343 1344
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
                client_ch,
                hl_loop)
        };
1345
        match actual_resp_result.get_err() {
B
Brian Anderson 已提交
1346
          ConnectionRefused => (),
B
Brian Anderson 已提交
1347
          _ => fail ~"unknown error.. expected connection_refused"
1348 1349
        }
    }
1350 1351
    fn impl_gl_tcp_ipv4_server_address_in_use() {
        let hl_loop = uv::global_loop::get();
1352
        let server_ip = ~"127.0.0.1";
1353
        let server_port = 8890u;
1354 1355
        let expected_req = ~"ping";
        let expected_resp = ~"pong";
1356

1357 1358
        let server_result_po = core::comm::Port::<~str>();
        let server_result_ch = core::comm::Chan(server_result_po);
1359

1360 1361
        let cont_po = core::comm::Port::<()>();
        let cont_ch = core::comm::Chan(cont_po);
1362
        // server
1363
        do task::spawn_sched(task::ManualThreads(1u)) {
B
Brian Anderson 已提交
1364
            let actual_req = do comm::listen |server_ch| {
1365
                run_tcp_test_server(
1366 1367 1368 1369
                    server_ip,
                    server_port,
                    expected_resp,
                    server_ch,
1370 1371
                    cont_ch,
                    hl_loop)
1372 1373 1374
            };
            server_result_ch.send(actual_req);
        };
E
Eric Holk 已提交
1375
        core::comm::recv(cont_po);
1376 1377 1378 1379 1380 1381
        // this one should fail..
        let listen_err = run_tcp_test_server_fail(
                            server_ip,
                            server_port,
                            hl_loop);
        // client.. just doing this so that the first server tears down
1382
        log(debug, ~"server started, firing up client..");
E
Eric Holk 已提交
1383
        do core::comm::listen |client_ch| {
1384 1385 1386 1387
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
1388 1389
                client_ch,
                hl_loop)
1390
        };
1391
        match listen_err {
B
Brian Anderson 已提交
1392
          AddressInUse => {
1393 1394
            assert true;
          }
B
Brian Anderson 已提交
1395
          _ => {
1396
            fail ~"expected address_in_use listen error,"+
B
Brian Anderson 已提交
1397
                ~"but got a different error varient. check logs.";
1398 1399 1400 1401 1402
          }
        }
    }
    fn impl_gl_tcp_ipv4_server_access_denied() {
        let hl_loop = uv::global_loop::get();
1403
        let server_ip = ~"127.0.0.1";
1404 1405 1406 1407 1408 1409
        let server_port = 80u;
        // this one should fail..
        let listen_err = run_tcp_test_server_fail(
                            server_ip,
                            server_port,
                            hl_loop);
1410
        match listen_err {
B
Brian Anderson 已提交
1411
          AccessDenied => {
1412 1413
            assert true;
          }
B
Brian Anderson 已提交
1414
          _ => {
1415 1416
            fail ~"expected address_in_use listen error,"+
                      ~"but got a different error varient. check logs.";
1417 1418 1419
          }
        }
    }
1420
    fn impl_gl_tcp_ipv4_server_client_reader_writer() {
1421 1422 1423
        /*
         XXX: Causes an ICE.

1424
        let iotask = uv::global_loop::get();
1425
        let server_ip = ~"127.0.0.1";
1426
        let server_port = 8891u;
1427 1428
        let expected_req = ~"ping";
        let expected_resp = ~"pong";
1429

E
Eric Holk 已提交
1430 1431
        let server_result_po = core::comm::port::<~str>();
        let server_result_ch = core::comm::chan(server_result_po);
1432

E
Eric Holk 已提交
1433 1434
        let cont_po = core::comm::port::<()>();
        let cont_ch = core::comm::chan(cont_po);
1435
        // server
1436
        do task::spawn_sched(task::ManualThreads(1u)) {
1437
            let actual_req = do comm::listen |server_ch| {
1438 1439 1440 1441 1442 1443 1444 1445 1446 1447
                run_tcp_test_server(
                    server_ip,
                    server_port,
                    expected_resp,
                    server_ch,
                    cont_ch,
                    iotask)
            };
            server_result_ch.send(actual_req);
        };
E
Eric Holk 已提交
1448
        core::comm::recv(cont_po);
1449 1450 1451
        // client
        let server_addr = ip::v4::parse_addr(server_ip);
        let conn_result = connect(server_addr, server_port, iotask);
1452
        if result::is_err(conn_result) {
1453 1454 1455
            assert false;
        }
        let sock_buf = @socket_buf(result::unwrap(conn_result));
1456
        buf_write(sock_buf, expected_req);
1457 1458

        // so contrived!
1459
        let actual_resp = do str::as_bytes(expected_resp) |resp_buf| {
1460
            buf_read(sock_buf, vec::len(resp_buf))
1461
        };
1462

E
Eric Holk 已提交
1463
        let actual_req = core::comm::recv(server_result_po);
P
Paul Stansifer 已提交
1464 1465 1466 1467
        log(debug, fmt!("REQ: expected: '%s' actual: '%s'",
                       expected_req, actual_req));
        log(debug, fmt!("RESP: expected: '%s' actual: '%s'",
                       expected_resp, actual_resp));
1468 1469
        assert str::contains(actual_req, expected_req);
        assert str::contains(actual_resp, expected_resp);
1470
        */
1471
    }
1472

1473
    fn buf_write<W:io::Writer>(+w: &W, val: ~str) {
P
Paul Stansifer 已提交
1474
        log(debug, fmt!("BUF_WRITE: val len %?", str::len(val)));
1475
        do str::byte_slice(val) |b_slice| {
P
Paul Stansifer 已提交
1476 1477
            log(debug, fmt!("BUF_WRITE: b_slice len %?",
                            vec::len(b_slice)));
1478
            w.write(b_slice)
1479 1480 1481
        }
    }

1482 1483
    fn buf_read<R:io::Reader>(+r: &R, len: uint) -> ~str {
        let new_bytes = (*r).read_bytes(len);
P
Paul Stansifer 已提交
1484 1485
        log(debug, fmt!("in buf_read.. new_bytes len: %?",
                        vec::len(new_bytes)));
1486 1487
        str::from_bytes(new_bytes)
    }
1488

1489
    fn run_tcp_test_server(server_ip: ~str, server_port: uint, resp: ~str,
1490 1491
                          server_ch: comm::Chan<~str>,
                          cont_ch: comm::Chan<()>,
B
Brian Anderson 已提交
1492
                          iotask: IoTask) -> ~str {
1493 1494 1495
        let server_ip_addr = ip::v4::parse_addr(server_ip);
        let listen_result = listen(server_ip_addr, server_port, 128u, iotask,
            // on_establish_cb -- called when listener is set up
1496
            |kill_ch| {
P
Paul Stansifer 已提交
1497 1498
                log(debug, fmt!("establish_cb %?",
                    kill_ch));
E
Eric Holk 已提交
1499
                core::comm::send(cont_ch, ());
1500 1501 1502
            },
            // risky to run this on the loop, but some users
            // will want the POWER
1503
            |new_conn, kill_ch| {
1504
            log(debug, ~"SERVER: new connection!");
1505
            do comm::listen |cont_ch| {
1506
                do task::spawn_sched(task::ManualThreads(1u)) {
1507
                    log(debug, ~"SERVER: starting worker for new req");
1508 1509

                    let accept_result = accept(new_conn);
1510
                    log(debug, ~"SERVER: after accept()");
1511
                    if result::is_err(accept_result) {
1512
                        log(debug, ~"SERVER: error accept connection");
1513
                        let err_data = result::get_err(accept_result);
B
Brian Anderson 已提交
1514
                        core::comm::send(kill_ch, Some(err_data));
1515
                        log(debug,
1516
                            ~"SERVER/WORKER: send on err cont ch");
1517 1518 1519 1520
                        cont_ch.send(());
                    }
                    else {
                        log(debug,
1521
                            ~"SERVER/WORKER: send on cont ch");
1522
                        cont_ch.send(());
1523
                        let sock = result::unwrap(move accept_result);
1524 1525
                        log(debug, ~"SERVER: successfully accepted"+
                            ~"connection!");
1526
                        let received_req_bytes = read(sock, 0u);
1527
                        match received_req_bytes {
1528
                          result::Ok(data) => {
1529
                            log(debug, ~"SERVER: got REQ str::from_bytes..");
P
Paul Stansifer 已提交
1530 1531
                            log(debug, fmt!("SERVER: REQ data len: %?",
                                            vec::len(data)));
1532 1533
                            server_ch.send(
                                str::from_bytes(data));
1534
                            log(debug, ~"SERVER: before write");
1535
                            tcp_write_single(sock, str::to_bytes(resp));
1536
                            log(debug, ~"SERVER: after write.. die");
B
Brian Anderson 已提交
1537
                            core::comm::send(kill_ch, None);
1538
                          }
1539
                          result::Err(err_data) => {
P
Paul Stansifer 已提交
1540 1541
                            log(debug, fmt!("SERVER: error recvd: %s %s",
                                err_data.err_name, err_data.err_msg));
B
Brian Anderson 已提交
1542
                            core::comm::send(kill_ch, Some(err_data));
1543
                            server_ch.send(~"");
1544
                          }
1545
                        }
1546
                        log(debug, ~"SERVER: worker spinning down");
1547
                    }
1548
                }
1549
                log(debug, ~"SERVER: waiting to recv on cont_ch");
1550 1551
                cont_ch.recv()
            };
1552
            log(debug, ~"SERVER: recv'd on cont_ch..leaving listen cb");
1553 1554 1555
        });
        // err check on listen_result
        if result::is_err(listen_result) {
1556
            match result::get_err(listen_result) {
B
Brian Anderson 已提交
1557
              GenericListenErr(name, msg) => {
P
Paul Stansifer 已提交
1558 1559
                fail fmt!("SERVER: exited abnormally name %s msg %s",
                                name, msg);
1560
              }
B
Brian Anderson 已提交
1561
              AccessDenied => {
1562
                fail ~"SERVER: exited abnormally, got access denied..";
1563
              }
B
Brian Anderson 已提交
1564
              AddressInUse => {
1565
                fail ~"SERVER: exited abnormally, got address in use...";
1566
              }
1567
            }
1568
        }
1569
        let ret_val = server_ch.recv();
P
Paul Stansifer 已提交
1570
        log(debug, fmt!("SERVER: exited and got return val: '%s'", ret_val));
1571 1572 1573
        ret_val
    }

1574
    fn run_tcp_test_server_fail(server_ip: ~str, server_port: uint,
B
Brian Anderson 已提交
1575
                          iotask: IoTask) -> TcpListenErrData {
1576 1577 1578
        let server_ip_addr = ip::v4::parse_addr(server_ip);
        let listen_result = listen(server_ip_addr, server_port, 128u, iotask,
            // on_establish_cb -- called when listener is set up
1579
            |kill_ch| {
P
Paul Stansifer 已提交
1580 1581
                log(debug, fmt!("establish_cb %?",
                    kill_ch));
1582
            },
1583
            |new_conn, kill_ch| {
P
Paul Stansifer 已提交
1584 1585
                fail fmt!("SERVER: shouldn't be called.. %? %?",
                           new_conn, kill_ch);
1586 1587
        });
        // err check on listen_result
1588
        if result::is_err(listen_result) {
1589 1590 1591
            result::get_err(listen_result)
        }
        else {
1592
            fail ~"SERVER: did not fail as expected"
1593 1594 1595
        }
    }

1596
    fn run_tcp_test_client(server_ip: ~str, server_port: uint, resp: ~str,
1597
                          client_ch: comm::Chan<~str>,
B
Brian Anderson 已提交
1598
                          iotask: IoTask) -> result::Result<~str,
B
Brian Anderson 已提交
1599
                                                    TcpConnectErrData> {
1600
        let server_ip_addr = ip::v4::parse_addr(server_ip);
1601

1602
        log(debug, ~"CLIENT: starting..");
1603
        let connect_result = connect(server_ip_addr, server_port, iotask);
1604
        if result::is_err(connect_result) {
1605
            log(debug, ~"CLIENT: failed to connect");
1606
            let err_data = result::get_err(connect_result);
1607
            Err(err_data)
1608
        }
1609
        else {
1610
            let sock = result::unwrap(move connect_result);
1611
            let resp_bytes = str::to_bytes(resp);
1612
            tcp_write_single(sock, resp_bytes);
1613
            let read_result = sock.read(0u);
1614
            if read_result.is_err() {
1615
                log(debug, ~"CLIENT: failure to read");
1616
                Ok(~"")
1617
            }
1618 1619 1620
            else {
                client_ch.send(str::from_bytes(read_result.get()));
                let ret_val = client_ch.recv();
P
Paul Stansifer 已提交
1621 1622
                log(debug, fmt!("CLIENT: after client_ch recv ret: '%s'",
                   ret_val));
1623
                Ok(ret_val)
1624 1625
            }
        }
1626 1627
    }

B
Brian Anderson 已提交
1628
    fn tcp_write_single(sock: TcpSocket, val: ~[u8]) {
1629 1630
        let write_result_future = sock.write_future(val);
        let write_result = write_result_future.get();
1631
        if result::is_err(write_result) {
1632
            log(debug, ~"tcp_write_single: write failed!");
1633
            let err_data = result::get_err(write_result);
P
Paul Stansifer 已提交
1634 1635
            log(debug, fmt!("tcp_write_single err name: %s msg: %s",
                err_data.err_name, err_data.err_msg));
1636
            // meh. torn on what to do here.
1637
            fail ~"tcp_write_single failed";
1638
        }
1639
    }
1640
}