net_tcp.rs 59.5 KB
Newer Older
1
//! High-level interface to libuv's TCP functionality
2

P
Patrick Walton 已提交
3 4 5 6
use ip = net_ip;
use uv::iotask;
use uv::iotask::IoTask;
use future_spawn = future::spawn;
J
Jeff Olson 已提交
7 8
// FIXME #1935
// should be able to, but can't atm, replace w/ result::{result, extensions};
P
Patrick Walton 已提交
9 10 11 12
use result::*;
use libc::size_t;
use io::{Reader, ReaderUtil, Writer};
use comm = core::comm;
J
Jeff Olson 已提交
13

14
// tcp interfaces
B
Brian Anderson 已提交
15
export TcpSocket;
16
// buffered socket
17
export TcpSocketBuf, socket_buf;
18
// errors
B
Brian Anderson 已提交
19
export TcpErrData, TcpConnectErrData;
20
// operations on a tcp_socket
21
export write, write_future, read_start, read_stop;
22
// tcp server stuff
23
export listen, accept;
24 25
// tcp client stuff
export connect;
J
Jeff Olson 已提交
26

27
#[nolink]
28
extern mod rustrt {
29
    #[legacy_exports];
30
    fn rust_uv_current_kernel_malloc(size: libc::c_uint) -> *libc::c_void;
31
    fn rust_uv_current_kernel_free(mem: *libc::c_void);
32
    fn rust_uv_helper_uv_tcp_t_size() -> libc::c_uint;
33 34
}

35 36 37 38 39 40 41
/**
 * Encapsulates an open TCP/IP connection through libuv
 *
 * `tcp_socket` is non-copyable/sendable and automagically handles closing the
 * underlying libuv data structures when it goes out of scope. This is the
 * data structure that is used for read/write operations over a TCP stream.
 */
B
Brian Anderson 已提交
42
struct TcpSocket {
43
  socket_data: @TcpSocketData,
44
  drop {
45
    unsafe {
46
        tear_down_socket_data(self.socket_data)
47 48
    }
  }
J
Jeff Olson 已提交
49 50
}

B
Brian Anderson 已提交
51 52 53 54 55 56
fn TcpSocket(socket_data: @TcpSocketData) -> TcpSocket {
    TcpSocket {
        socket_data: socket_data
    }
}

57 58 59 60
/**
 * A buffered wrapper for `net::tcp::tcp_socket`
 *
 * It is created with a call to `net::tcp::socket_buf()` and has impls that
61
 * satisfy both the `io::reader` and `io::writer` traits.
62
 */
B
Brian Anderson 已提交
63
struct TcpSocketBuf {
64
    data: @TcpBufferedSocketData,
B
Brian Anderson 已提交
65 66 67 68 69 70
}

fn TcpSocketBuf(data: @TcpBufferedSocketData) -> TcpSocketBuf {
    TcpSocketBuf {
        data: data
    }
71 72
}

73
/// Contains raw, string-based, error information returned from libuv
B
Brian Anderson 已提交
74
type TcpErrData = {
75 76
    err_name: ~str,
    err_msg: ~str
77
};
78
/// Details returned as part of a `result::err` result from `tcp::listen`
B
Brian Anderson 已提交
79
enum TcpListenErrData {
80 81 82 83
    /**
     * Some unplanned-for error. The first and second fields correspond
     * to libuv's `err_name` and `err_msg` fields, respectively.
     */
B
Brian Anderson 已提交
84
    GenericListenErr(~str, ~str),
85 86 87 88 89 90 91
    /**
     * Failed to bind to the requested IP/Port, because it is already in use.
     *
     * # Possible Causes
     *
     * * Attempting to bind to a port already bound to another listener
     */
B
Brian Anderson 已提交
92
    AddressInUse,
93 94 95 96 97 98 99 100 101 102
    /**
     * Request to bind to an IP/Port was denied by the system.
     *
     * # Possible Causes
     *
     * * Attemping to binding to an IP/Port as a non-Administrator
     *   on Windows Vista+
     * * Attempting to bind, as a non-priv'd
     *   user, to 'privileged' ports (< 1024) on *nix
     */
B
Brian Anderson 已提交
103
    AccessDenied
104
}
105
/// Details returned as part of a `result::err` result from `tcp::connect`
B
Brian Anderson 已提交
106
enum TcpConnectErrData {
107 108 109 110
    /**
     * Some unplanned-for error. The first and second fields correspond
     * to libuv's `err_name` and `err_msg` fields, respectively.
     */
B
Brian Anderson 已提交
111
    GenericConnectErr(~str, ~str),
112
    /// Invalid IP or invalid port
B
Brian Anderson 已提交
113
    ConnectionRefused
114
}
115

116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
/**
 * Initiate a client connection over TCP/IP
 *
 * # Arguments
 *
 * * `input_ip` - The IP address (versions 4 or 6) of the remote host
 * * `port` - the unsigned integer of the desired remote host port
 * * `iotask` - a `uv::iotask` that the tcp request will run on
 *
 * # Returns
 *
 * A `result` that, if the operation succeeds, contains a
 * `net::net::tcp_socket` that can be used to send and receive data to/from
 * the remote host. In the event of failure, a
 * `net::tcp::tcp_connect_err_data` instance will be returned
 */
B
Brian Anderson 已提交
132
fn connect(+input_ip: ip::IpAddr, port: uint,
B
Brian Anderson 已提交
133
           iotask: IoTask)
B
Brian Anderson 已提交
134 135
    -> result::Result<TcpSocket, TcpConnectErrData> unsafe {
    let result_po = core::comm::Port::<ConnAttempt>();
136
    let closed_signal_po = core::comm::Port::<()>();
J
Jeff Olson 已提交
137
    let conn_data = {
138 139
        result_ch: core::comm::Chan(result_po),
        closed_signal_ch: core::comm::Chan(closed_signal_po)
J
Jeff Olson 已提交
140
    };
T
Tim Chevalier 已提交
141
    let conn_data_ptr = ptr::addr_of(&conn_data);
B
Brian Anderson 已提交
142
    let reader_po = core::comm::Port::<result::Result<~[u8], TcpErrData>>();
143 144 145
    let stream_handle_ptr = malloc_uv_tcp_t();
    *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
    let socket_data = @{
146
        reader_po: reader_po,
147
        reader_ch: core::comm::Chan(reader_po),
148 149 150
        stream_handle_ptr: stream_handle_ptr,
        connect_req: uv::ll::connect_t(),
        write_req: uv::ll::write_t(),
151
        iotask: iotask
J
Jeff Olson 已提交
152
    };
T
Tim Chevalier 已提交
153
    let socket_data_ptr = ptr::addr_of(&(*socket_data));
P
Paul Stansifer 已提交
154
    log(debug, fmt!("tcp_connect result_ch %?", conn_data.result_ch));
J
Jeff Olson 已提交
155 156
    // get an unsafe representation of our stream_handle_ptr that
    // we can send into the interact cb to be handled in libuv..
P
Paul Stansifer 已提交
157 158
    log(debug, fmt!("stream_handle_ptr outside interact %?",
        stream_handle_ptr));
T
Tim Chevalier 已提交
159
    do iotask::interact(iotask) |move input_ip, loop_ptr| unsafe {
160
        log(debug, ~"in interact cb for tcp client connect..");
P
Paul Stansifer 已提交
161 162
        log(debug, fmt!("stream_handle_ptr in interact %?",
            stream_handle_ptr));
163
        match uv::ll::tcp_init( loop_ptr, stream_handle_ptr) {
B
Brian Anderson 已提交
164
          0i32 => {
165
            log(debug, ~"tcp_init successful");
166 167
            log(debug, ~"dealing w/ ipv4 connection..");
            let connect_req_ptr =
T
Tim Chevalier 已提交
168
                ptr::addr_of(&((*socket_data_ptr).connect_req));
K
Kevin Cantu 已提交
169
            let addr_str = ip::format_addr(&input_ip);
170
            let connect_result = match input_ip {
171
              ip::Ipv4(ref addr) => {
172 173 174 175 176 177 178 179 180 181
                // have to "recreate" the sockaddr_in/6
                // since the ip_addr discards the port
                // info.. should probably add an additional
                // rust type that actually is closer to
                // what the libuv API expects (ip str + port num)
                log(debug, fmt!("addr: %?", addr));
                let in_addr = uv::ll::ip4_addr(addr_str, port as int);
                uv::ll::tcp_connect(
                    connect_req_ptr,
                    stream_handle_ptr,
T
Tim Chevalier 已提交
182
                    ptr::addr_of(&in_addr),
183 184
                    tcp_connect_on_connect_cb)
              }
185
              ip::Ipv6(ref addr) => {
186 187 188 189 190
                log(debug, fmt!("addr: %?", addr));
                let in_addr = uv::ll::ip6_addr(addr_str, port as int);
                uv::ll::tcp_connect6(
                    connect_req_ptr,
                    stream_handle_ptr,
T
Tim Chevalier 已提交
191
                    ptr::addr_of(&in_addr),
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
                    tcp_connect_on_connect_cb)
              }
            };
            match connect_result {
              0i32 => {
                log(debug, ~"tcp_connect successful");
                // reusable data that we'll have for the
                // duration..
                uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                           socket_data_ptr as
                                              *libc::c_void);
                // just so the connect_cb can send the
                // outcome..
                uv::ll::set_data_for_req(connect_req_ptr,
                                         conn_data_ptr);
                log(debug, ~"leaving tcp_connect interact cb...");
                // let tcp_connect_on_connect_cb send on
                // the result_ch, now..
              }
              _ => {
                // immediate connect failure.. probably a garbage
                // ip or somesuch
                let err_data = uv::ll::get_last_err_data(loop_ptr);
                core::comm::send((*conn_data_ptr).result_ch,
B
Brian Anderson 已提交
216
                           ConnFailure(err_data.to_tcp_err()));
217 218 219
                uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                               conn_data_ptr);
                uv::ll::close(stream_handle_ptr, stream_error_close_cb);
J
Jeff Olson 已提交
220 221 222
              }
            }
        }
B
Brian Anderson 已提交
223
          _ => {
J
Jeff Olson 已提交
224 225
            // failure to create a tcp handle
            let err_data = uv::ll::get_last_err_data(loop_ptr);
E
Eric Holk 已提交
226
            core::comm::send((*conn_data_ptr).result_ch,
B
Brian Anderson 已提交
227
                       ConnFailure(err_data.to_tcp_err()));
J
Jeff Olson 已提交
228 229 230
          }
        }
    };
E
Eric Holk 已提交
231
    match core::comm::recv(result_po) {
B
Brian Anderson 已提交
232
      ConnSuccess => {
233
        log(debug, ~"tcp::connect - received success on result_po");
B
Brian Anderson 已提交
234
        result::Ok(TcpSocket(socket_data))
J
Jeff Olson 已提交
235
      }
236
      ConnFailure(ref err_data) => {
E
Eric Holk 已提交
237
        core::comm::recv(closed_signal_po);
238
        log(debug, ~"tcp::connect - received failure on result_po");
239 240 241
        // still have to free the malloc'd stream handle..
        rustrt::rust_uv_current_kernel_free(stream_handle_ptr
                                           as *libc::c_void);
242
        let tcp_conn_err = match err_data.err_name {
B
Brian Anderson 已提交
243 244
          ~"ECONNREFUSED" => ConnectionRefused,
          _ => GenericConnectErr(err_data.err_name, err_data.err_msg)
245
        };
246
        result::Err(tcp_conn_err)
J
Jeff Olson 已提交
247 248 249
      }
    }
}
250

251 252 253 254 255 256
/**
 * Write binary data to a tcp stream; Blocks until operation completes
 *
 * # Arguments
 *
 * * sock - a `tcp_socket` to write to
257
 * * raw_write_data - a vector of `~[u8]` that will be written to the stream.
258 259 260 261 262 263 264
 * This value must remain valid for the duration of the `write` call
 *
 * # Returns
 *
 * A `result` object with a `nil` value as the `ok` variant, or a
 * `tcp_err_data` value as the `err` variant
 */
B
Brian Anderson 已提交
265
fn write(sock: &TcpSocket, raw_write_data: ~[u8])
B
Brian Anderson 已提交
266
    -> result::Result<(), TcpErrData> unsafe {
T
Tim Chevalier 已提交
267
    let socket_data_ptr = ptr::addr_of(&(*(sock.socket_data)));
268 269 270
    write_common_impl(socket_data_ptr, raw_write_data)
}

271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
/**
 * Write binary data to tcp stream; Returns a `future::future` value
 * immediately
 *
 * # Safety
 *
 * This function can produce unsafe results if:
 *
 * 1. the call to `write_future` is made
 * 2. the `future::future` value returned is never resolved via
 * `future::get`
 * 3. and then the `tcp_socket` passed in to `write_future` leaves
 * scope and is destructed before the task that runs the libuv write
 * operation completes.
 *
 * As such: If using `write_future`, always be sure to resolve the returned
 * `future` so as to ensure libuv doesn't try to access a released write
 * handle. Otherwise, use the blocking `tcp::write` function instead.
 *
 * # Arguments
 *
 * * sock - a `tcp_socket` to write to
293
 * * raw_write_data - a vector of `~[u8]` that will be written to the stream.
294 295 296 297 298 299 300 301
 * This value must remain valid for the duration of the `write` call
 *
 * # Returns
 *
 * A `future` value that, once the `write` operation completes, resolves to a
 * `result` object with a `nil` value as the `ok` variant, or a `tcp_err_data`
 * value as the `err` variant
 */
B
Brian Anderson 已提交
302
fn write_future(sock: &TcpSocket, raw_write_data: ~[u8])
B
Brian Anderson 已提交
303
    -> future::Future<result::Result<(), TcpErrData>> unsafe {
T
Tim Chevalier 已提交
304
    let socket_data_ptr = ptr::addr_of(&(*(sock.socket_data)));
305
    do future_spawn {
306
        let data_copy = copy(raw_write_data);
307
        write_common_impl(socket_data_ptr, data_copy)
308
    }
309 310
}

311 312 313 314 315 316 317 318 319 320 321
/**
 * Begin reading binary data from an open TCP connection; used with
 * `read_stop`
 *
 * # Arguments
 *
 * * sock -- a `net::tcp::tcp_socket` for the connection to read from
 *
 * # Returns
 *
 * * A `result` instance that will either contain a
E
Eric Holk 已提交
322 323 324
 * `core::comm::port<tcp_read_result>` that the user can read (and
 * optionally, loop on) from until `read_stop` is called, or a
 * `tcp_err_data` record
325
 */
B
Brian Anderson 已提交
326
fn read_start(sock: &TcpSocket)
327
    -> result::Result<comm::Port<
B
Brian Anderson 已提交
328
        result::Result<~[u8], TcpErrData>>, TcpErrData> unsafe {
T
Tim Chevalier 已提交
329
    let socket_data = ptr::addr_of(&(*(sock.socket_data)));
330
    read_start_common_impl(socket_data)
331
}
332

333 334 335 336 337 338 339
/**
 * Stop reading from an open TCP connection; used with `read_start`
 *
 * # Arguments
 *
 * * `sock` - a `net::tcp::tcp_socket` that you wish to stop reading on
 */
B
Brian Anderson 已提交
340 341
fn read_stop(sock: &TcpSocket,
             +read_port: comm::Port<result::Result<~[u8], TcpErrData>>) ->
B
Brian Anderson 已提交
342
    result::Result<(), TcpErrData> unsafe {
P
Paul Stansifer 已提交
343
    log(debug, fmt!("taking the read_port out of commission %?", read_port));
T
Tim Chevalier 已提交
344
    let socket_data = ptr::addr_of(&(*sock.socket_data));
345 346 347
    read_stop_common_impl(socket_data)
}

348 349 350 351 352 353 354 355 356 357 358 359 360 361 362
/**
 * Reads a single chunk of data from `tcp_socket`; block until data/error
 * recv'd
 *
 * Does a blocking read operation for a single chunk of data from a
 * `tcp_socket` until a data arrives or an error is received. The provided
 * `timeout_msecs` value is used to raise an error if the timeout period
 * passes without any data received.
 *
 * # Arguments
 *
 * * `sock` - a `net::tcp::tcp_socket` that you wish to read from
 * * `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
 * read attempt. Pass `0u` to wait indefinitely
 */
B
Brian Anderson 已提交
363
fn read(sock: &TcpSocket, timeout_msecs: uint)
B
Brian Anderson 已提交
364
    -> result::Result<~[u8],TcpErrData> {
T
Tim Chevalier 已提交
365
    let socket_data = ptr::addr_of(&(*(sock.socket_data)));
366 367 368
    read_common_impl(socket_data, timeout_msecs)
}

369
/**
370
 * Reads a single chunk of data; returns a `future::future<~[u8]>`
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397
 * immediately
 *
 * Does a non-blocking read operation for a single chunk of data from a
 * `tcp_socket` and immediately returns a `future` value representing the
 * result. When resolving the returned `future`, it will block until data
 * arrives or an error is received. The provided `timeout_msecs`
 * value is used to raise an error if the timeout period passes without any
 * data received.
 *
 * # Safety
 *
 * This function can produce unsafe results if the call to `read_future` is
 * made, the `future::future` value returned is never resolved via
 * `future::get`, and then the `tcp_socket` passed in to `read_future` leaves
 * scope and is destructed before the task that runs the libuv read
 * operation completes.
 *
 * As such: If using `read_future`, always be sure to resolve the returned
 * `future` so as to ensure libuv doesn't try to access a released read
 * handle. Otherwise, use the blocking `tcp::read` function instead.
 *
 * # Arguments
 *
 * * `sock` - a `net::tcp::tcp_socket` that you wish to read from
 * * `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
 * read attempt. Pass `0u` to wait indefinitely
 */
B
Brian Anderson 已提交
398
fn read_future(sock: &TcpSocket, timeout_msecs: uint)
B
Brian Anderson 已提交
399
    -> future::Future<result::Result<~[u8],TcpErrData>> {
T
Tim Chevalier 已提交
400
    let socket_data = ptr::addr_of(&(*(sock.socket_data)));
401
    do future_spawn {
402
        read_common_impl(socket_data, timeout_msecs)
403
    }
404
}
405

406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
/**
 * Bind an incoming client connection to a `net::tcp::tcp_socket`
 *
 * # Notes
 *
 * It is safe to call `net::tcp::accept` _only_ within the context of the
 * `new_connect_cb` callback provided as the final argument to the
 * `net::tcp::listen` function.
 *
 * The `new_conn` opaque value is provided _only_ as the first argument to the
 * `new_connect_cb` provided as a part of `net::tcp::listen`.
 * It can be safely sent to another task but it _must_ be
 * used (via `net::tcp::accept`) before the `new_connect_cb` call it was
 * provided to returns.
 *
 * This implies that a port/chan pair must be used to make sure that the
 * `new_connect_cb` call blocks until an attempt to create a
 * `net::tcp::tcp_socket` is completed.
 *
 * # Example
 *
 * Here, the `new_conn` is used in conjunction with `accept` from within
 * a task spawned by the `new_connect_cb` passed into `listen`
 *
 * ~~~~~~~~~~~
 * net::tcp::listen(remote_ip, remote_port, backlog)
 *     // this callback is ran once after the connection is successfully
 *     // set up
 *     {|kill_ch|
 *       // pass the kill_ch to your main loop or wherever you want
 *       // to be able to externally kill the server from
 *     }
 *     // this callback is ran when a new connection arrives
 *     {|new_conn, kill_ch|
E
Eric Holk 已提交
440 441
 *     let cont_po = core::comm::port::<option<tcp_err_data>>();
 *     let cont_ch = core::comm::chan(cont_po);
442 443 444
 *     task::spawn {||
 *         let accept_result = net::tcp::accept(new_conn);
 *         if accept_result.is_err() {
E
Eric Holk 已提交
445
 *             core::comm::send(cont_ch, result::get_err(accept_result));
446 447 448 449
 *             // fail?
 *         }
 *         else {
 *             let sock = result::get(accept_result);
E
Eric Holk 已提交
450
 *             core::comm::send(cont_ch, true);
451 452 453
 *             // do work here
 *         }
 *     };
E
Eric Holk 已提交
454
 *     match core::comm::recv(cont_po) {
455
 *       // shut down listen()
E
Eric Holk 已提交
456
 *       some(err_data) { core::comm::send(kill_chan, some(err_data)) }
457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
 *       // wait for next connection
 *       none {}
 *     }
 * };
 * ~~~~~~~~~~~
 *
 * # Arguments
 *
 * * `new_conn` - an opaque value used to create a new `tcp_socket`
 *
 * # Returns
 *
 * On success, this function will return a `net::tcp::tcp_socket` as the
 * `ok` variant of a `result`. The `net::tcp::tcp_socket` is anchored within
 * the task that `accept` was called within for its lifetime. On failure,
 * this function will return a `net::tcp::tcp_err_data` record
 * as the `err` variant of a `result`.
 */
B
Brian Anderson 已提交
475 476
fn accept(new_conn: TcpNewConnection)
    -> result::Result<TcpSocket, TcpErrData> unsafe {
477

478
    match new_conn{
B
Brian Anderson 已提交
479
      NewTcpConn(server_handle_ptr) => {
480
        let server_data_ptr = uv::ll::get_data_for_uv_handle(
B
Brian Anderson 已提交
481
            server_handle_ptr) as *TcpListenFcData;
482
        let reader_po = core::comm::Port();
483
        let iotask = (*server_data_ptr).iotask;
484 485 486
        let stream_handle_ptr = malloc_uv_tcp_t();
        *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
        let client_socket_data = @{
487
            reader_po: reader_po,
488
            reader_ch: core::comm::Chan(reader_po),
489
            stream_handle_ptr : stream_handle_ptr,
490 491
            connect_req : uv::ll::connect_t(),
            write_req : uv::ll::write_t(),
492
            iotask : iotask
493
        };
T
Tim Chevalier 已提交
494
        let client_socket_data_ptr = ptr::addr_of(&(*client_socket_data));
495 496
        let client_stream_handle_ptr =
            (*client_socket_data_ptr).stream_handle_ptr;
497

B
Brian Anderson 已提交
498
        let result_po = core::comm::Port::<Option<TcpErrData>>();
499
        let result_ch = core::comm::Chan(result_po);
500 501 502 503 504 505 506

        // UNSAFE LIBUV INTERACTION BEGIN
        // .. normally this happens within the context of
        // a call to uv::hl::interact.. but we're breaking
        // the rules here because this always has to be
        // called within the context of a listen() new_connect_cb
        // callback (or it will likely fail and drown your cat)
507
        log(debug, ~"in interact cb for tcp::accept");
508 509
        let loop_ptr = uv::ll::get_loop_for_uv_handle(
            server_handle_ptr);
510
        match uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
B
Brian Anderson 已提交
511
          0i32 => {
512
            log(debug, ~"uv_tcp_init successful for client stream");
513
            match uv::ll::accept(
514 515
                server_handle_ptr as *libc::c_void,
                client_stream_handle_ptr as *libc::c_void) {
B
Brian Anderson 已提交
516
              0i32 => {
517
                log(debug, ~"successfully accepted client connection");
518
                uv::ll::set_data_for_uv_handle(client_stream_handle_ptr,
519 520
                                               client_socket_data_ptr
                                                   as *libc::c_void);
B
Brian Anderson 已提交
521
                core::comm::send(result_ch, None);
522
              }
B
Brian Anderson 已提交
523
              _ => {
524
                log(debug, ~"failed to accept client conn");
B
Brian Anderson 已提交
525
                core::comm::send(result_ch, Some(
526 527 528
                    uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
              }
            }
529
          }
B
Brian Anderson 已提交
530
          _ => {
531
            log(debug, ~"failed to init client stream");
B
Brian Anderson 已提交
532
            core::comm::send(result_ch, Some(
533 534 535 536
                uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
          }
        }
        // UNSAFE LIBUV INTERACTION END
E
Eric Holk 已提交
537
        match core::comm::recv(result_po) {
538
          Some(copy err_data) => result::Err(err_data),
B
Brian Anderson 已提交
539
          None => result::Ok(TcpSocket(client_socket_data))
540 541 542 543 544
        }
      }
    }
}

545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562
/**
 * Bind to a given IP/port and listen for new connections
 *
 * # Arguments
 *
 * * `host_ip` - a `net::ip::ip_addr` representing a unique IP
 * (versions 4 or 6)
 * * `port` - a uint representing the port to listen on
 * * `backlog` - a uint representing the number of incoming connections
 * to cache in memory
 * * `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on
 * * `on_establish_cb` - a callback that is evaluated if/when the listener
 * is successfully established. it takes no parameters
 * * `new_connect_cb` - a callback to be evaluated, on the libuv thread,
 * whenever a client attempts to conect on the provided ip/port. the
 * callback's arguments are:
 *     * `new_conn` - an opaque type that can be passed to
 *     `net::tcp::accept` in order to be converted to a `tcp_socket`.
E
Eric Holk 已提交
563 564
 *     * `kill_ch` - channel of type `core::comm::chan<option<tcp_err_data>>`.
 *     this channel can be used to send a message to cause `listen` to begin
565 566 567 568 569 570 571 572
 *     closing the underlying libuv data structures.
 *
 * # returns
 *
 * a `result` instance containing empty data of type `()` on a
 * successful/normal shutdown, and a `tcp_listen_err_data` enum in the event
 * of listen exiting because of an error
 */
B
Brian Anderson 已提交
573
fn listen(+host_ip: ip::IpAddr, port: uint, backlog: uint,
B
Brian Anderson 已提交
574
          iotask: IoTask,
B
Brian Anderson 已提交
575
          +on_establish_cb: fn~(comm::Chan<Option<TcpErrData>>),
B
Brian Anderson 已提交
576 577 578
          +new_connect_cb: fn~(TcpNewConnection,
                               comm::Chan<Option<TcpErrData>>))
    -> result::Result<(), TcpListenErrData> unsafe {
T
Tim Chevalier 已提交
579
    do listen_common(move host_ip, port, backlog, iotask, on_establish_cb)
580
        // on_connect_cb
581
        |move new_connect_cb, handle| unsafe {
582
            let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
B
Brian Anderson 已提交
583 584
                as *TcpListenFcData;
            let new_conn = NewTcpConn(handle);
585 586 587 588 589
            let kill_ch = (*server_data_ptr).kill_ch;
            new_connect_cb(new_conn, kill_ch);
    }
}

B
Brian Anderson 已提交
590
fn listen_common(+host_ip: ip::IpAddr, port: uint, backlog: uint,
B
Brian Anderson 已提交
591
          iotask: IoTask,
B
Brian Anderson 已提交
592 593
          +on_establish_cb: fn~(comm::Chan<Option<TcpErrData>>),
          +on_connect_cb: fn~(*uv::ll::uv_tcp_t))
B
Brian Anderson 已提交
594
    -> result::Result<(), TcpListenErrData> unsafe {
595
    let stream_closed_po = core::comm::Port::<()>();
B
Brian Anderson 已提交
596
    let kill_po = core::comm::Port::<Option<TcpErrData>>();
597
    let kill_ch = core::comm::Chan(kill_po);
598
    let server_stream = uv::ll::tcp_t();
T
Tim Chevalier 已提交
599
    let server_stream_ptr = ptr::addr_of(&server_stream);
600 601
    let server_data = {
        server_stream_ptr: server_stream_ptr,
602
        stream_closed_ch: core::comm::Chan(stream_closed_po),
603
        kill_ch: kill_ch,
T
Tim Chevalier 已提交
604
        on_connect_cb: move on_connect_cb,
605
        iotask: iotask,
606 607
        mut active: true
    };
T
Tim Chevalier 已提交
608
    let server_data_ptr = ptr::addr_of(&server_data);
609

E
Eric Holk 已提交
610
    let setup_result = do core::comm::listen |setup_ch| {
J
Jeff Olson 已提交
611
        // this is to address a compiler warning about
612 613 614 615
        // an implicit copy.. it seems that double nested
        // will defeat a move sigil, as is done to the host_ip
        // arg above.. this same pattern works w/o complaint in
        // tcp::connect (because the iotask::interact cb isn't
E
Eric Holk 已提交
616
        // nested within a core::comm::listen block)
617
        let loc_ip = copy(host_ip);
T
Tim Chevalier 已提交
618
        do iotask::interact(iotask) |move loc_ip, loop_ptr| unsafe {
619
            match uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
B
Brian Anderson 已提交
620
              0i32 => {
621 622 623
                uv::ll::set_data_for_uv_handle(
                    server_stream_ptr,
                    server_data_ptr);
K
Kevin Cantu 已提交
624
                let addr_str = ip::format_addr(&loc_ip);
625
                let bind_result = match loc_ip {
626
                  ip::Ipv4(ref addr) => {
P
Paul Stansifer 已提交
627
                    log(debug, fmt!("addr: %?", addr));
628 629
                    let in_addr = uv::ll::ip4_addr(addr_str, port as int);
                    uv::ll::tcp_bind(server_stream_ptr,
T
Tim Chevalier 已提交
630
                                     ptr::addr_of(&in_addr))
631
                  }
632
                  ip::Ipv6(ref addr) => {
P
Paul Stansifer 已提交
633
                    log(debug, fmt!("addr: %?", addr));
634 635
                    let in_addr = uv::ll::ip6_addr(addr_str, port as int);
                    uv::ll::tcp_bind6(server_stream_ptr,
T
Tim Chevalier 已提交
636
                                     ptr::addr_of(&in_addr))
637 638
                  }
                };
639
                match bind_result {
B
Brian Anderson 已提交
640
                  0i32 => {
641
                    match uv::ll::listen(server_stream_ptr,
642 643
                                       backlog as libc::c_int,
                                       tcp_lfc_on_connection_cb) {
B
Brian Anderson 已提交
644
                      0i32 => core::comm::send(setup_ch, None),
B
Brian Anderson 已提交
645
                      _ => {
646
                        log(debug, ~"failure to uv_listen()");
647
                        let err_data = uv::ll::get_last_err_data(loop_ptr);
B
Brian Anderson 已提交
648
                        core::comm::send(setup_ch, Some(err_data));
649 650
                      }
                    }
651
                  }
B
Brian Anderson 已提交
652
                  _ => {
653
                    log(debug, ~"failure to uv_tcp_bind");
654
                    let err_data = uv::ll::get_last_err_data(loop_ptr);
B
Brian Anderson 已提交
655
                    core::comm::send(setup_ch, Some(err_data));
656 657 658
                  }
                }
              }
B
Brian Anderson 已提交
659
              _ => {
660
                log(debug, ~"failure to uv_tcp_init");
661
                let err_data = uv::ll::get_last_err_data(loop_ptr);
B
Brian Anderson 已提交
662
                core::comm::send(setup_ch, Some(err_data));
663 664
              }
            }
665 666
        };
        setup_ch.recv()
667
    };
668
    match setup_result {
669
      Some(ref err_data) => {
670
        do iotask::interact(iotask) |loop_ptr| unsafe {
P
Paul Stansifer 已提交
671 672
            log(debug, fmt!("tcp::listen post-kill recv hl interact %?",
                            loop_ptr));
673 674 675 676
            (*server_data_ptr).active = false;
            uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
        };
        stream_closed_po.recv();
677
        match err_data.err_name {
B
Brian Anderson 已提交
678
          ~"EACCES" => {
679
            log(debug, ~"Got EACCES error");
B
Brian Anderson 已提交
680
            result::Err(AccessDenied)
681
          }
B
Brian Anderson 已提交
682
          ~"EADDRINUSE" => {
683
            log(debug, ~"Got EADDRINUSE error");
B
Brian Anderson 已提交
684
            result::Err(AddressInUse)
685
          }
B
Brian Anderson 已提交
686
          _ => {
P
Paul Stansifer 已提交
687 688
            log(debug, fmt!("Got '%s' '%s' libuv error",
                            err_data.err_name, err_data.err_msg));
689
            result::Err(
B
Brian Anderson 已提交
690
                GenericListenErr(err_data.err_name, err_data.err_msg))
691 692 693
          }
        }
      }
B
Brian Anderson 已提交
694
      None => {
695
        on_establish_cb(kill_ch);
E
Eric Holk 已提交
696
        let kill_result = core::comm::recv(kill_po);
697
        do iotask::interact(iotask) |loop_ptr| unsafe {
P
Paul Stansifer 已提交
698 699
            log(debug, fmt!("tcp::listen post-kill recv hl interact %?",
                            loop_ptr));
700 701 702
            (*server_data_ptr).active = false;
            uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
        };
703
        stream_closed_po.recv();
704
        match kill_result {
705
          // some failure post bind/listen
B
Brian Anderson 已提交
706 707 708
          Some(ref err_data) => result::Err(GenericListenErr(
              err_data.err_name,
              err_data.err_msg)),
709
          // clean exit
710
          None => result::Ok(())
711 712 713 714 715
        }
      }
    }
}

716 717 718 719 720 721 722 723 724 725 726 727 728 729 730
/**
 * Convert a `net::tcp::tcp_socket` to a `net::tcp::tcp_socket_buf`.
 *
 * This function takes ownership of a `net::tcp::tcp_socket`, returning it
 * stored within a buffered wrapper, which can be converted to a `io::reader`
 * or `io::writer`
 *
 * # Arguments
 *
 * * `sock` -- a `net::tcp::tcp_socket` that you want to buffer
 *
 * # Returns
 *
 * A buffered wrapper that you can cast as an `io::reader` or `io::writer`
 */
B
Brian Anderson 已提交
731
fn socket_buf(+sock: TcpSocket) -> TcpSocketBuf {
732
    TcpSocketBuf(@{ sock: move sock, mut buf: ~[] })
733 734
}

735
/// Convenience methods extending `net::tcp::tcp_socket`
B
Brian Anderson 已提交
736
impl TcpSocket {
737
    fn read_start() -> result::Result<comm::Port<
B
Brian Anderson 已提交
738
        result::Result<~[u8], TcpErrData>>, TcpErrData> {
B
Brian Anderson 已提交
739
        read_start(&self)
740
    }
B
Brian Anderson 已提交
741
    fn read_stop(+read_port:
B
Brian Anderson 已提交
742 743
                 comm::Port<result::Result<~[u8], TcpErrData>>) ->
        result::Result<(), TcpErrData> {
B
Brian Anderson 已提交
744
        read_stop(&self, move read_port)
745
    }
746
    fn read(timeout_msecs: uint) ->
B
Brian Anderson 已提交
747
        result::Result<~[u8], TcpErrData> {
B
Brian Anderson 已提交
748
        read(&self, timeout_msecs)
749 750
    }
    fn read_future(timeout_msecs: uint) ->
B
Brian Anderson 已提交
751
        future::Future<result::Result<~[u8], TcpErrData>> {
B
Brian Anderson 已提交
752
        read_future(&self, timeout_msecs)
753
    }
754
    fn write(raw_write_data: ~[u8])
B
Brian Anderson 已提交
755
        -> result::Result<(), TcpErrData> {
B
Brian Anderson 已提交
756
        write(&self, raw_write_data)
757
    }
758
    fn write_future(raw_write_data: ~[u8])
B
Brian Anderson 已提交
759
        -> future::Future<result::Result<(), TcpErrData>> {
B
Brian Anderson 已提交
760
        write_future(&self, raw_write_data)
761
    }
762
}
763

764
/// Implementation of `io::reader` trait for a buffered `net::tcp::tcp_socket`
B
Brian Anderson 已提交
765
impl TcpSocketBuf: io::Reader {
766 767 768
    fn read(buf: &[mut u8], len: uint) -> uint {
        // Loop until our buffer has enough data in it for us to read from.
        while self.data.buf.len() < len {
B
Brian Anderson 已提交
769
            let read_result = read(&self.data.sock, 0u);
770
            if read_result.is_err() {
771
                let err_data = read_result.get_err();
772 773 774 775

                if err_data.err_name == ~"EOF" {
                    break;
                } else {
P
Paul Stansifer 已提交
776 777
                    debug!("ERROR sock_buf as io::reader.read err %? %?",
                           err_data.err_name, err_data.err_msg);
778

B
Brian Anderson 已提交
779
                    return 0;
780
                }
781 782
            }
            else {
783
                self.data.buf.push_all(result::unwrap(read_result));
784 785
            }
        }
786

787
        let count = uint::min(len, self.data.buf.len());
788 789 790 791

        let mut data = ~[];
        self.data.buf <-> data;

E
Erick Tryzelaar 已提交
792
        vec::bytes::memcpy(buf, vec::view(data, 0, data.len()), count);
793

794
        self.data.buf.push_all(vec::view(data, count, data.len()));
795 796

        count
797 798
    }
    fn read_byte() -> int {
799
        let mut bytes = ~[0];
800
        if self.read(bytes, 1u) == 0 { fail } else { bytes[0] as int }
801 802
    }
    fn unread_byte(amt: int) {
N
Niko Matsakis 已提交
803
        self.data.buf.unshift(amt as u8);
804 805 806 807
    }
    fn eof() -> bool {
        false // noop
    }
808
    fn seek(dist: int, seek: io::SeekStyle) {
P
Paul Stansifer 已提交
809
        log(debug, fmt!("tcp_socket_buf seek stub %? %?", dist, seek));
810 811 812 813 814 815 816
        // noop
    }
    fn tell() -> uint {
        0u // noop
    }
}

817
/// Implementation of `io::reader` trait for a buffered `net::tcp::tcp_socket`
B
Brian Anderson 已提交
818
impl TcpSocketBuf: io::Writer {
819
    fn write(data: &[const u8]) unsafe {
820
        let socket_data_ptr =
T
Tim Chevalier 已提交
821
            ptr::addr_of(&(*((*(self.data)).sock).socket_data));
822 823
        let w_result = write_common_impl(socket_data_ptr,
                                        vec::slice(data, 0, vec::len(data)));
824
        if w_result.is_err() {
825
            let err_data = w_result.get_err();
P
Paul Stansifer 已提交
826 827
            log(debug, fmt!("ERROR sock_buf as io::writer.writer err: %? %?",
                             err_data.err_name, err_data.err_msg));
828
        }
829
    }
830
    fn seek(dist: int, seek: io::SeekStyle) {
P
Paul Stansifer 已提交
831
      log(debug, fmt!("tcp_socket_buf seek stub %? %?", dist, seek));
832 833 834 835 836 837 838 839
        // noop
    }
    fn tell() -> uint {
        0u
    }
    fn flush() -> int {
        0
    }
840 841
    fn get_type() -> io::WriterType {
        io::File
842
    }
843 844
}

845 846
// INTERNAL API

B
Brian Anderson 已提交
847
fn tear_down_socket_data(socket_data: @TcpSocketData) unsafe {
848 849
    let closed_po = core::comm::Port::<()>();
    let closed_ch = core::comm::Chan(closed_po);
850 851 852
    let close_data = {
        closed_ch: closed_ch
    };
T
Tim Chevalier 已提交
853
    let close_data_ptr = ptr::addr_of(&close_data);
854
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
855
    do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe {
P
Paul Stansifer 已提交
856 857
        log(debug, fmt!("interact dtor for tcp_socket stream %? loop %?",
            stream_handle_ptr, loop_ptr));
858 859 860 861
        uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                       close_data_ptr);
        uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb);
    };
E
Eric Holk 已提交
862
    core::comm::recv(closed_po);
P
Paul Stansifer 已提交
863
    log(debug, fmt!("about to free socket_data at %?", socket_data));
864 865
    rustrt::rust_uv_current_kernel_free(stream_handle_ptr
                                       as *libc::c_void);
866
    log(debug, ~"exiting dtor for tcp_socket");
867 868
}

869
// shared implementation for tcp::read
B
Brian Anderson 已提交
870 871
fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint)
    -> result::Result<~[u8],TcpErrData> unsafe {
872
    log(debug, ~"starting tcp::read");
873
    let iotask = (*socket_data).iotask;
874
    let rs_result = read_start_common_impl(socket_data);
B
Brian Anderson 已提交
875 876
    if result::is_err(&rs_result) {
        let err_data = result::get_err(&rs_result);
877
        result::Err(err_data)
878 879
    }
    else {
880
        log(debug, ~"tcp::read before recv_timeout");
881 882
        let read_result = if timeout_msecs > 0u {
            timer::recv_timeout(
B
Brian Anderson 已提交
883
               iotask, timeout_msecs, result::get(&rs_result))
884
        } else {
B
Brian Anderson 已提交
885
            Some(core::comm::recv(result::get(&rs_result)))
886
        };
887
        log(debug, ~"tcp::read after recv_timeout");
888
        match move read_result {
B
Brian Anderson 已提交
889
          None => {
890
            log(debug, ~"tcp::read: timed out..");
891
            let err_data = {
892 893
                err_name: ~"TIMEOUT",
                err_msg: ~"req timed out"
894 895
            };
            read_stop_common_impl(socket_data);
896
            result::Err(err_data)
897
          }
898
          Some(move data_result) => {
899
            log(debug, ~"tcp::read got data");
900 901 902 903 904 905 906 907
            read_stop_common_impl(socket_data);
            data_result
          }
        }
    }
}

// shared impl for read_stop
B
Brian Anderson 已提交
908 909
fn read_stop_common_impl(socket_data: *TcpSocketData) ->
    result::Result<(), TcpErrData> unsafe {
910
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
B
Brian Anderson 已提交
911
    let stop_po = core::comm::Port::<Option<TcpErrData>>();
912
    let stop_ch = core::comm::Chan(stop_po);
913
    do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe {
914
        log(debug, ~"in interact cb for tcp::read_stop");
915
        match uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
B
Brian Anderson 已提交
916
          0i32 => {
917
            log(debug, ~"successfully called uv_read_stop");
B
Brian Anderson 已提交
918
            core::comm::send(stop_ch, None);
919
          }
B
Brian Anderson 已提交
920
          _ => {
921
            log(debug, ~"failure in calling uv_read_stop");
922
            let err_data = uv::ll::get_last_err_data(loop_ptr);
B
Brian Anderson 已提交
923
            core::comm::send(stop_ch, Some(err_data.to_tcp_err()));
924 925 926
          }
        }
    };
E
Eric Holk 已提交
927
    match core::comm::recv(stop_po) {
928
      Some(ref err_data) => result::Err(err_data.to_tcp_err()),
929
      None => result::Ok(())
930 931 932 933
    }
}

// shared impl for read_start
B
Brian Anderson 已提交
934
fn read_start_common_impl(socket_data: *TcpSocketData)
935
    -> result::Result<comm::Port<
B
Brian Anderson 已提交
936
        result::Result<~[u8], TcpErrData>>, TcpErrData> unsafe {
937
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
938 939
    let start_po = core::comm::Port::<Option<uv::ll::uv_err_data>>();
    let start_ch = core::comm::Chan(start_po);
940
    log(debug, ~"in tcp::read_start before interact loop");
941
    do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe {
P
Paul Stansifer 已提交
942
        log(debug, fmt!("in tcp::read_start interact cb %?", loop_ptr));
943
        match uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
944 945
                               on_alloc_cb,
                               on_tcp_read_cb) {
B
Brian Anderson 已提交
946
          0i32 => {
947
            log(debug, ~"success doing uv_read_start");
B
Brian Anderson 已提交
948
            core::comm::send(start_ch, None);
949
          }
B
Brian Anderson 已提交
950
          _ => {
951
            log(debug, ~"error attempting uv_read_start");
952
            let err_data = uv::ll::get_last_err_data(loop_ptr);
B
Brian Anderson 已提交
953
            core::comm::send(start_ch, Some(err_data));
954 955 956
          }
        }
    };
E
Eric Holk 已提交
957
    match core::comm::recv(start_po) {
958
      Some(ref err_data) => result::Err(err_data.to_tcp_err()),
959
      None => result::Ok((*socket_data).reader_po)
960 961 962
    }
}

963 964
// helper to convert a "class" vector of [u8] to a *[uv::ll::uv_buf_t]

965
// shared implementation used by write and write_future
B
Brian Anderson 已提交
966
fn write_common_impl(socket_data_ptr: *TcpSocketData,
967
                     raw_write_data: ~[u8])
B
Brian Anderson 已提交
968
    -> result::Result<(), TcpErrData> unsafe {
T
Tim Chevalier 已提交
969
    let write_req_ptr = ptr::addr_of(&((*socket_data_ptr).write_req));
970 971
    let stream_handle_ptr =
        (*socket_data_ptr).stream_handle_ptr;
972
    let write_buf_vec =  ~[ uv::ll::buf_init(
B
Brian Anderson 已提交
973
        vec::raw::to_ptr(raw_write_data),
974
        vec::len(raw_write_data)) ];
T
Tim Chevalier 已提交
975
    let write_buf_vec_ptr = ptr::addr_of(&write_buf_vec);
B
Brian Anderson 已提交
976
    let result_po = core::comm::Port::<TcpWriteResult>();
977
    let write_data = {
978
        result_ch: core::comm::Chan(result_po)
979
    };
T
Tim Chevalier 已提交
980
    let write_data_ptr = ptr::addr_of(&write_data);
981
    do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| unsafe {
P
Paul Stansifer 已提交
982
        log(debug, fmt!("in interact cb for tcp::write %?", loop_ptr));
983
        match uv::ll::write(write_req_ptr,
984 985 986
                          stream_handle_ptr,
                          write_buf_vec_ptr,
                          tcp_write_complete_cb) {
B
Brian Anderson 已提交
987
          0i32 => {
988
            log(debug, ~"uv_write() invoked successfully");
989 990
            uv::ll::set_data_for_req(write_req_ptr, write_data_ptr);
          }
B
Brian Anderson 已提交
991
          _ => {
992
            log(debug, ~"error invoking uv_write()");
993
            let err_data = uv::ll::get_last_err_data(loop_ptr);
E
Eric Holk 已提交
994
            core::comm::send((*write_data_ptr).result_ch,
B
Brian Anderson 已提交
995
                       TcpWriteError(err_data.to_tcp_err()));
996 997 998
          }
        }
    };
999 1000 1001 1002
    // FIXME (#2656): Instead of passing unsafe pointers to local data,
    // and waiting here for the write to complete, we should transfer
    // ownership of everything to the I/O task and let it deal with the
    // aftermath, so we don't have to sit here blocking.
E
Eric Holk 已提交
1003
    match core::comm::recv(result_po) {
B
Brian Anderson 已提交
1004
      TcpWriteSuccess => result::Ok(()),
1005
      TcpWriteError(ref err_data) => result::Err(err_data.to_tcp_err())
1006 1007 1008
    }
}

B
Brian Anderson 已提交
1009 1010
enum TcpNewConnection {
    NewTcpConn(*uv::ll::uv_tcp_t)
1011 1012
}

B
Brian Anderson 已提交
1013
type TcpListenFcData = {
1014
    server_stream_ptr: *uv::ll::uv_tcp_t,
1015
    stream_closed_ch: comm::Chan<()>,
B
Brian Anderson 已提交
1016
    kill_ch: comm::Chan<Option<TcpErrData>>,
1017
    on_connect_cb: fn~(*uv::ll::uv_tcp_t),
B
Brian Anderson 已提交
1018
    iotask: IoTask,
1019 1020 1021
    mut active: bool
};

G
Graydon Hoare 已提交
1022
extern fn tcp_lfc_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
1023
    let server_data_ptr = uv::ll::get_data_for_uv_handle(
B
Brian Anderson 已提交
1024
        handle) as *TcpListenFcData;
E
Eric Holk 已提交
1025
    core::comm::send((*server_data_ptr).stream_closed_ch, ());
1026 1027
}

G
Graydon Hoare 已提交
1028
extern fn tcp_lfc_on_connection_cb(handle: *uv::ll::uv_tcp_t,
1029 1030
                                     status: libc::c_int) unsafe {
    let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
B
Brian Anderson 已提交
1031
        as *TcpListenFcData;
1032
    let kill_ch = (*server_data_ptr).kill_ch;
1033
    if (*server_data_ptr).active {
1034
        match status {
B
Brian Anderson 已提交
1035 1036
          0i32 => (*server_data_ptr).on_connect_cb(handle),
          _ => {
1037
            let loop_ptr = uv::ll::get_loop_for_uv_handle(handle);
E
Eric Holk 已提交
1038
            core::comm::send(kill_ch,
B
Brian Anderson 已提交
1039
                       Some(uv::ll::get_last_err_data(loop_ptr)
1040 1041 1042 1043 1044 1045 1046
                            .to_tcp_err()));
            (*server_data_ptr).active = false;
          }
        }
    }
}

1047
fn malloc_uv_tcp_t() -> *uv::ll::uv_tcp_t unsafe {
1048
    rustrt::rust_uv_current_kernel_malloc(
1049
        rustrt::rust_uv_helper_uv_tcp_t_size()) as *uv::ll::uv_tcp_t
1050 1051
}

B
Brian Anderson 已提交
1052 1053 1054
enum TcpConnectResult {
    TcpConnected(TcpSocket),
    TcpConnectError(TcpErrData)
1055 1056
}

B
Brian Anderson 已提交
1057 1058 1059
enum TcpWriteResult {
    TcpWriteSuccess,
    TcpWriteError(TcpErrData)
1060 1061
}

B
Brian Anderson 已提交
1062 1063 1064
enum TcpReadStartResult {
    TcpReadStartSuccess(comm::Port<TcpReadResult>),
    TcpReadStartError(TcpErrData)
1065 1066
}

B
Brian Anderson 已提交
1067 1068 1069 1070
enum TcpReadResult {
    TcpReadData(~[u8]),
    TcpReadDone,
    TcpReadErr(TcpErrData)
1071 1072
}

B
Brian Anderson 已提交
1073 1074
trait ToTcpErr {
    fn to_tcp_err() -> TcpErrData;
1075 1076
}

B
Brian Anderson 已提交
1077 1078
impl uv::ll::uv_err_data: ToTcpErr {
    fn to_tcp_err() -> TcpErrData {
1079 1080 1081 1082
        { err_name: self.err_name, err_msg: self.err_msg }
    }
}

G
Graydon Hoare 已提交
1083
extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
1084 1085
                    nread: libc::ssize_t,
                    ++buf: uv::ll::uv_buf_t) unsafe {
P
Paul Stansifer 已提交
1086 1087
    log(debug, fmt!("entering on_tcp_read_cb stream: %? nread: %?",
                    stream, nread));
1088 1089
    let loop_ptr = uv::ll::get_loop_for_uv_handle(stream);
    let socket_data_ptr = uv::ll::get_data_for_uv_handle(stream)
B
Brian Anderson 已提交
1090
        as *TcpSocketData;
1091
    match nread as int {
1092
      // incoming err.. probably eof
B
Brian Anderson 已提交
1093
      -1 => {
1094
        let err_data = uv::ll::get_last_err_data(loop_ptr).to_tcp_err();
P
Paul Stansifer 已提交
1095 1096
        log(debug, fmt!("on_tcp_read_cb: incoming err.. name %? msg %?",
                        err_data.err_name, err_data.err_msg));
1097
        let reader_ch = (*socket_data_ptr).reader_ch;
1098
        core::comm::send(reader_ch, result::Err(err_data));
1099 1100
      }
      // do nothing .. unneeded buf
B
Brian Anderson 已提交
1101
      0 => (),
1102
      // have data
B
Brian Anderson 已提交
1103
      _ => {
1104
        // we have data
P
Paul Stansifer 已提交
1105
        log(debug, fmt!("tcp on_read_cb nread: %d", nread as int));
1106
        let reader_ch = (*socket_data_ptr).reader_ch;
1107
        let buf_base = uv::ll::get_base_from_buf(buf);
B
Brian Anderson 已提交
1108
        let new_bytes = vec::raw::from_buf(buf_base, nread as uint);
1109
        core::comm::send(reader_ch, result::Ok(new_bytes));
1110 1111 1112
      }
    }
    uv::ll::free_base_of_buf(buf);
1113
    log(debug, ~"exiting on_tcp_read_cb");
1114 1115
}

G
Graydon Hoare 已提交
1116
extern fn on_alloc_cb(handle: *libc::c_void,
B
Brian Anderson 已提交
1117
                      suggested_size: size_t)
1118
    -> uv::ll::uv_buf_t unsafe {
1119
    log(debug, ~"tcp read on_alloc_cb!");
1120
    let char_ptr = uv::ll::malloc_buf_base_of(suggested_size);
P
Paul Stansifer 已提交
1121
    log(debug, fmt!("tcp read on_alloc_cb h: %? char_ptr: %u sugsize: %u",
1122 1123
                     handle,
                     char_ptr as uint,
P
Paul Stansifer 已提交
1124
                     suggested_size as uint));
1125
    uv::ll::buf_init(char_ptr, suggested_size as uint)
1126
}
1127

B
Brian Anderson 已提交
1128
type TcpSocketCloseData = {
1129
    closed_ch: comm::Chan<()>
1130 1131
};

G
Graydon Hoare 已提交
1132
extern fn tcp_socket_dtor_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
1133
    let data = uv::ll::get_data_for_uv_handle(handle)
B
Brian Anderson 已提交
1134
        as *TcpSocketCloseData;
1135
    let closed_ch = (*data).closed_ch;
E
Eric Holk 已提交
1136
    core::comm::send(closed_ch, ());
1137
    log(debug, ~"tcp_socket_dtor_close_cb exiting..");
1138 1139
}

G
Graydon Hoare 已提交
1140
extern fn tcp_write_complete_cb(write_req: *uv::ll::uv_write_t,
1141 1142
                              status: libc::c_int) unsafe {
    let write_data_ptr = uv::ll::get_data_for_req(write_req)
B
Brian Anderson 已提交
1143
        as *WriteReqData;
1144
    if status == 0i32 {
1145
        log(debug, ~"successful write complete");
B
Brian Anderson 已提交
1146
        core::comm::send((*write_data_ptr).result_ch, TcpWriteSuccess);
1147
    } else {
1148 1149 1150 1151
        let stream_handle_ptr = uv::ll::get_stream_handle_from_write_req(
            write_req);
        let loop_ptr = uv::ll::get_loop_for_uv_handle(stream_handle_ptr);
        let err_data = uv::ll::get_last_err_data(loop_ptr);
1152
        log(debug, ~"failure to write");
E
Eric Holk 已提交
1153
        core::comm::send((*write_data_ptr).result_ch,
B
Brian Anderson 已提交
1154
                         TcpWriteError(err_data));
1155 1156 1157
    }
}

B
Brian Anderson 已提交
1158 1159
type WriteReqData = {
    result_ch: comm::Chan<TcpWriteResult>
1160 1161
};

B
Brian Anderson 已提交
1162 1163
type ConnectReqData = {
    result_ch: comm::Chan<ConnAttempt>,
1164
    closed_signal_ch: comm::Chan<()>
J
Jeff Olson 已提交
1165 1166
};

G
Graydon Hoare 已提交
1167
extern fn stream_error_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
J
Jeff Olson 已提交
1168
    let data = uv::ll::get_data_for_uv_handle(handle) as
B
Brian Anderson 已提交
1169
        *ConnectReqData;
E
Eric Holk 已提交
1170
    core::comm::send((*data).closed_signal_ch, ());
P
Paul Stansifer 已提交
1171
    log(debug, fmt!("exiting steam_error_close_cb for %?", handle));
J
Jeff Olson 已提交
1172 1173
}

G
Graydon Hoare 已提交
1174
extern fn tcp_connect_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
P
Paul Stansifer 已提交
1175
    log(debug, fmt!("closed client tcp handle %?", handle));
J
Jeff Olson 已提交
1176 1177
}

G
Graydon Hoare 已提交
1178
extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t,
J
Jeff Olson 已提交
1179 1180
                                   status: libc::c_int) unsafe {
    let conn_data_ptr = (uv::ll::get_data_for_req(connect_req_ptr)
B
Brian Anderson 已提交
1181
                      as *ConnectReqData);
J
Jeff Olson 已提交
1182
    let result_ch = (*conn_data_ptr).result_ch;
P
Paul Stansifer 已提交
1183
    log(debug, fmt!("tcp_connect result_ch %?", result_ch));
J
Jeff Olson 已提交
1184 1185
    let tcp_stream_ptr =
        uv::ll::get_stream_handle_from_connect_req(connect_req_ptr);
1186
    match status {
B
Brian Anderson 已提交
1187
      0i32 => {
1188
        log(debug, ~"successful tcp connection!");
B
Brian Anderson 已提交
1189
        core::comm::send(result_ch, ConnSuccess);
J
Jeff Olson 已提交
1190
      }
B
Brian Anderson 已提交
1191
      _ => {
1192
        log(debug, ~"error in tcp_connect_on_connect_cb");
J
Jeff Olson 已提交
1193 1194
        let loop_ptr = uv::ll::get_loop_for_uv_handle(tcp_stream_ptr);
        let err_data = uv::ll::get_last_err_data(loop_ptr);
P
Paul Stansifer 已提交
1195 1196
        log(debug, fmt!("err_data %? %?", err_data.err_name,
                        err_data.err_msg));
B
Brian Anderson 已提交
1197
        core::comm::send(result_ch, ConnFailure(err_data));
J
Jeff Olson 已提交
1198 1199 1200 1201 1202
        uv::ll::set_data_for_uv_handle(tcp_stream_ptr,
                                       conn_data_ptr);
        uv::ll::close(tcp_stream_ptr, stream_error_close_cb);
      }
    }
1203
    log(debug, ~"leaving tcp_connect_on_connect_cb");
J
Jeff Olson 已提交
1204 1205
}

B
Brian Anderson 已提交
1206 1207 1208
enum ConnAttempt {
    ConnSuccess,
    ConnFailure(uv::ll::uv_err_data)
J
Jeff Olson 已提交
1209 1210
}

B
Brian Anderson 已提交
1211 1212 1213
type TcpSocketData = {
    reader_po: comm::Port<result::Result<~[u8], TcpErrData>>,
    reader_ch: comm::Chan<result::Result<~[u8], TcpErrData>>,
1214
    stream_handle_ptr: *uv::ll::uv_tcp_t,
J
Jeff Olson 已提交
1215
    connect_req: uv::ll::uv_connect_t,
1216
    write_req: uv::ll::uv_write_t,
B
Brian Anderson 已提交
1217
    iotask: IoTask
J
Jeff Olson 已提交
1218 1219
};

B
Brian Anderson 已提交
1220 1221
type TcpBufferedSocketData = {
    sock: TcpSocket,
1222
    mut buf: ~[u8]
1223
};
J
Jeff Olson 已提交
1224

1225
//#[cfg(test)]
1226
mod test {
1227
    #[legacy_exports];
1228
    // FIXME don't run on fbsd or linux 32 bit (#2064)
1229 1230 1231 1232
    #[cfg(target_os="win32")]
    #[cfg(target_os="darwin")]
    #[cfg(target_os="linux")]
    mod tcp_ipv4_server_and_client_test {
1233
        #[legacy_exports];
1234 1235
        #[cfg(target_arch="x86_64")]
        mod impl64 {
1236
            #[legacy_exports];
1237 1238 1239 1240
            #[test]
            fn test_gl_tcp_server_and_client_ipv4() unsafe {
                impl_gl_tcp_ipv4_server_and_client();
            }
1241
            #[test]
1242 1243 1244
            fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
                impl_gl_tcp_ipv4_client_error_connection_refused();
            }
1245 1246 1247 1248 1249 1250 1251
            #[test]
            fn test_gl_tcp_server_address_in_use() unsafe {
                impl_gl_tcp_ipv4_server_address_in_use();
            }
            #[test]
            fn test_gl_tcp_server_access_denied() unsafe {
                impl_gl_tcp_ipv4_server_access_denied();
1252
            }
1253 1254 1255 1256
            #[test]
            fn test_gl_tcp_ipv4_server_client_reader_writer() {
                impl_gl_tcp_ipv4_server_client_reader_writer();
            }
1257

1258 1259 1260
        }
        #[cfg(target_arch="x86")]
        mod impl32 {
1261
            #[legacy_exports];
1262 1263 1264 1265 1266
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_server_and_client_ipv4() unsafe {
                impl_gl_tcp_ipv4_server_and_client();
            }
1267 1268
            #[test]
            #[ignore(cfg(target_os = "linux"))]
1269 1270 1271
            fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
                impl_gl_tcp_ipv4_client_error_connection_refused();
            }
1272 1273 1274 1275 1276 1277 1278
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_server_address_in_use() unsafe {
                impl_gl_tcp_ipv4_server_address_in_use();
            }
            #[test]
            #[ignore(cfg(target_os = "linux"))]
1279
            #[ignore(cfg(windows), reason = "deadlocking bots")]
1280 1281 1282
            fn test_gl_tcp_server_access_denied() unsafe {
                impl_gl_tcp_ipv4_server_access_denied();
            }
1283 1284 1285 1286
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_ipv4_server_client_reader_writer() {
                impl_gl_tcp_ipv4_server_client_reader_writer();
1287
            }
1288
        }
1289
    }
1290
    fn impl_gl_tcp_ipv4_server_and_client() {
1291
        let hl_loop = uv::global_loop::get();
1292
        let server_ip = ~"127.0.0.1";
1293
        let server_port = 8888u;
1294 1295
        let expected_req = ~"ping";
        let expected_resp = ~"pong";
1296

1297 1298
        let server_result_po = core::comm::Port::<~str>();
        let server_result_ch = core::comm::Chan(server_result_po);
1299

1300 1301
        let cont_po = core::comm::Port::<()>();
        let cont_ch = core::comm::Chan(cont_po);
1302
        // server
1303
        do task::spawn_sched(task::ManualThreads(1u)) {
B
Brian Anderson 已提交
1304
            let actual_req = do comm::listen |server_ch| {
1305 1306 1307 1308
                run_tcp_test_server(
                    server_ip,
                    server_port,
                    expected_resp,
1309
                    server_ch,
1310 1311
                    cont_ch,
                    hl_loop)
1312 1313 1314
            };
            server_result_ch.send(actual_req);
        };
E
Eric Holk 已提交
1315
        core::comm::recv(cont_po);
1316
        // client
1317
        log(debug, ~"server started, firing up client..");
E
Eric Holk 已提交
1318
        let actual_resp_result = do core::comm::listen |client_ch| {
1319 1320 1321 1322
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
1323 1324
                client_ch,
                hl_loop)
1325
        };
1326
        assert actual_resp_result.is_ok();
1327
        let actual_resp = actual_resp_result.get();
E
Eric Holk 已提交
1328
        let actual_req = core::comm::recv(server_result_po);
P
Paul Stansifer 已提交
1329 1330 1331 1332
        log(debug, fmt!("REQ: expected: '%s' actual: '%s'",
                       expected_req, actual_req));
        log(debug, fmt!("RESP: expected: '%s' actual: '%s'",
                       expected_resp, actual_resp));
1333 1334
        assert str::contains(actual_req, expected_req);
        assert str::contains(actual_resp, expected_resp);
1335
    }
1336
    fn impl_gl_tcp_ipv4_client_error_connection_refused() {
1337
        let hl_loop = uv::global_loop::get();
1338
        let server_ip = ~"127.0.0.1";
1339
        let server_port = 8889u;
1340
        let expected_req = ~"ping";
1341
        // client
1342
        log(debug, ~"firing up client..");
E
Eric Holk 已提交
1343
        let actual_resp_result = do core::comm::listen |client_ch| {
1344 1345 1346 1347 1348 1349 1350
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
                client_ch,
                hl_loop)
        };
1351
        match actual_resp_result.get_err() {
B
Brian Anderson 已提交
1352
          ConnectionRefused => (),
B
Brian Anderson 已提交
1353
          _ => fail ~"unknown error.. expected connection_refused"
1354 1355
        }
    }
1356 1357
    fn impl_gl_tcp_ipv4_server_address_in_use() {
        let hl_loop = uv::global_loop::get();
1358
        let server_ip = ~"127.0.0.1";
1359
        let server_port = 8890u;
1360 1361
        let expected_req = ~"ping";
        let expected_resp = ~"pong";
1362

1363 1364
        let server_result_po = core::comm::Port::<~str>();
        let server_result_ch = core::comm::Chan(server_result_po);
1365

1366 1367
        let cont_po = core::comm::Port::<()>();
        let cont_ch = core::comm::Chan(cont_po);
1368
        // server
1369
        do task::spawn_sched(task::ManualThreads(1u)) {
B
Brian Anderson 已提交
1370
            let actual_req = do comm::listen |server_ch| {
1371
                run_tcp_test_server(
1372 1373 1374 1375
                    server_ip,
                    server_port,
                    expected_resp,
                    server_ch,
1376 1377
                    cont_ch,
                    hl_loop)
1378 1379 1380
            };
            server_result_ch.send(actual_req);
        };
E
Eric Holk 已提交
1381
        core::comm::recv(cont_po);
1382 1383 1384 1385 1386 1387
        // this one should fail..
        let listen_err = run_tcp_test_server_fail(
                            server_ip,
                            server_port,
                            hl_loop);
        // client.. just doing this so that the first server tears down
1388
        log(debug, ~"server started, firing up client..");
E
Eric Holk 已提交
1389
        do core::comm::listen |client_ch| {
1390 1391 1392 1393
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
1394 1395
                client_ch,
                hl_loop)
1396
        };
1397
        match listen_err {
B
Brian Anderson 已提交
1398
          AddressInUse => {
1399 1400
            assert true;
          }
B
Brian Anderson 已提交
1401
          _ => {
1402
            fail ~"expected address_in_use listen error,"+
B
Brian Anderson 已提交
1403
                ~"but got a different error varient. check logs.";
1404 1405 1406 1407 1408
          }
        }
    }
    fn impl_gl_tcp_ipv4_server_access_denied() {
        let hl_loop = uv::global_loop::get();
1409
        let server_ip = ~"127.0.0.1";
1410 1411 1412 1413 1414 1415
        let server_port = 80u;
        // this one should fail..
        let listen_err = run_tcp_test_server_fail(
                            server_ip,
                            server_port,
                            hl_loop);
1416
        match listen_err {
B
Brian Anderson 已提交
1417
          AccessDenied => {
1418 1419
            assert true;
          }
B
Brian Anderson 已提交
1420
          _ => {
1421 1422
            fail ~"expected address_in_use listen error,"+
                      ~"but got a different error varient. check logs.";
1423 1424 1425
          }
        }
    }
1426
    fn impl_gl_tcp_ipv4_server_client_reader_writer() {
1427 1428 1429
        /*
         XXX: Causes an ICE.

1430
        let iotask = uv::global_loop::get();
1431
        let server_ip = ~"127.0.0.1";
1432
        let server_port = 8891u;
1433 1434
        let expected_req = ~"ping";
        let expected_resp = ~"pong";
1435

E
Eric Holk 已提交
1436 1437
        let server_result_po = core::comm::port::<~str>();
        let server_result_ch = core::comm::chan(server_result_po);
1438

E
Eric Holk 已提交
1439 1440
        let cont_po = core::comm::port::<()>();
        let cont_ch = core::comm::chan(cont_po);
1441
        // server
1442
        do task::spawn_sched(task::ManualThreads(1u)) {
1443
            let actual_req = do comm::listen |server_ch| {
1444 1445 1446 1447 1448 1449 1450 1451 1452 1453
                run_tcp_test_server(
                    server_ip,
                    server_port,
                    expected_resp,
                    server_ch,
                    cont_ch,
                    iotask)
            };
            server_result_ch.send(actual_req);
        };
E
Eric Holk 已提交
1454
        core::comm::recv(cont_po);
1455 1456 1457
        // client
        let server_addr = ip::v4::parse_addr(server_ip);
        let conn_result = connect(server_addr, server_port, iotask);
1458
        if result::is_err(conn_result) {
1459 1460 1461
            assert false;
        }
        let sock_buf = @socket_buf(result::unwrap(conn_result));
1462
        buf_write(sock_buf, expected_req);
1463 1464

        // so contrived!
1465
        let actual_resp = do str::as_bytes(expected_resp) |resp_buf| {
1466
            buf_read(sock_buf, vec::len(resp_buf))
1467
        };
1468

E
Eric Holk 已提交
1469
        let actual_req = core::comm::recv(server_result_po);
P
Paul Stansifer 已提交
1470 1471 1472 1473
        log(debug, fmt!("REQ: expected: '%s' actual: '%s'",
                       expected_req, actual_req));
        log(debug, fmt!("RESP: expected: '%s' actual: '%s'",
                       expected_resp, actual_resp));
1474 1475
        assert str::contains(actual_req, expected_req);
        assert str::contains(actual_resp, expected_resp);
1476
        */
1477
    }
1478

B
Brian Anderson 已提交
1479
    fn buf_write<W:io::Writer>(+w: &W, val: &str) {
P
Paul Stansifer 已提交
1480
        log(debug, fmt!("BUF_WRITE: val len %?", str::len(val)));
1481
        do str::byte_slice(val) |b_slice| {
P
Paul Stansifer 已提交
1482 1483
            log(debug, fmt!("BUF_WRITE: b_slice len %?",
                            vec::len(b_slice)));
1484
            w.write(b_slice)
1485 1486 1487
        }
    }

1488 1489
    fn buf_read<R:io::Reader>(+r: &R, len: uint) -> ~str {
        let new_bytes = (*r).read_bytes(len);
P
Paul Stansifer 已提交
1490 1491
        log(debug, fmt!("in buf_read.. new_bytes len: %?",
                        vec::len(new_bytes)));
1492 1493
        str::from_bytes(new_bytes)
    }
1494

B
Brian Anderson 已提交
1495
    fn run_tcp_test_server(server_ip: &str, server_port: uint, +resp: ~str,
1496 1497
                          server_ch: comm::Chan<~str>,
                          cont_ch: comm::Chan<()>,
B
Brian Anderson 已提交
1498
                          iotask: IoTask) -> ~str {
1499
        let server_ip_addr = ip::v4::parse_addr(server_ip);
T
Tim Chevalier 已提交
1500 1501
        let listen_result = listen(move server_ip_addr, server_port, 128,
                                   iotask,
1502
            // on_establish_cb -- called when listener is set up
1503
            |kill_ch| {
P
Paul Stansifer 已提交
1504 1505
                log(debug, fmt!("establish_cb %?",
                    kill_ch));
E
Eric Holk 已提交
1506
                core::comm::send(cont_ch, ());
1507 1508 1509
            },
            // risky to run this on the loop, but some users
            // will want the POWER
1510
            |new_conn, kill_ch| {
1511
            log(debug, ~"SERVER: new connection!");
1512
            do comm::listen |cont_ch| {
1513
                do task::spawn_sched(task::ManualThreads(1u)) {
1514
                    log(debug, ~"SERVER: starting worker for new req");
1515 1516

                    let accept_result = accept(new_conn);
1517
                    log(debug, ~"SERVER: after accept()");
B
Brian Anderson 已提交
1518
                    if result::is_err(&accept_result) {
1519
                        log(debug, ~"SERVER: error accept connection");
B
Brian Anderson 已提交
1520
                        let err_data = result::get_err(&accept_result);
B
Brian Anderson 已提交
1521
                        core::comm::send(kill_ch, Some(err_data));
1522
                        log(debug,
1523
                            ~"SERVER/WORKER: send on err cont ch");
1524 1525 1526 1527
                        cont_ch.send(());
                    }
                    else {
                        log(debug,
1528
                            ~"SERVER/WORKER: send on cont ch");
1529
                        cont_ch.send(());
1530
                        let sock = result::unwrap(move accept_result);
1531 1532
                        log(debug, ~"SERVER: successfully accepted"+
                            ~"connection!");
B
Brian Anderson 已提交
1533
                        let received_req_bytes = read(&sock, 0u);
1534 1535
                        match move received_req_bytes {
                          result::Ok(move data) => {
1536
                            log(debug, ~"SERVER: got REQ str::from_bytes..");
P
Paul Stansifer 已提交
1537 1538
                            log(debug, fmt!("SERVER: REQ data len: %?",
                                            vec::len(data)));
1539 1540
                            server_ch.send(
                                str::from_bytes(data));
1541
                            log(debug, ~"SERVER: before write");
B
Brian Anderson 已提交
1542
                            tcp_write_single(&sock, str::to_bytes(resp));
1543
                            log(debug, ~"SERVER: after write.. die");
B
Brian Anderson 已提交
1544
                            core::comm::send(kill_ch, None);
1545
                          }
1546
                          result::Err(move err_data) => {
P
Paul Stansifer 已提交
1547 1548
                            log(debug, fmt!("SERVER: error recvd: %s %s",
                                err_data.err_name, err_data.err_msg));
B
Brian Anderson 已提交
1549
                            core::comm::send(kill_ch, Some(err_data));
1550
                            server_ch.send(~"");
1551
                          }
1552
                        }
1553
                        log(debug, ~"SERVER: worker spinning down");
1554
                    }
1555
                }
1556
                log(debug, ~"SERVER: waiting to recv on cont_ch");
1557 1558
                cont_ch.recv()
            };
1559
            log(debug, ~"SERVER: recv'd on cont_ch..leaving listen cb");
1560 1561
        });
        // err check on listen_result
B
Brian Anderson 已提交
1562 1563
        if result::is_err(&listen_result) {
            match result::get_err(&listen_result) {
1564
              GenericListenErr(ref name, ref msg) => {
P
Paul Stansifer 已提交
1565
                fail fmt!("SERVER: exited abnormally name %s msg %s",
1566
                                *name, *msg);
1567
              }
B
Brian Anderson 已提交
1568
              AccessDenied => {
1569
                fail ~"SERVER: exited abnormally, got access denied..";
1570
              }
B
Brian Anderson 已提交
1571
              AddressInUse => {
1572
                fail ~"SERVER: exited abnormally, got address in use...";
1573
              }
1574
            }
1575
        }
1576
        let ret_val = server_ch.recv();
P
Paul Stansifer 已提交
1577
        log(debug, fmt!("SERVER: exited and got return val: '%s'", ret_val));
1578 1579 1580
        ret_val
    }

B
Brian Anderson 已提交
1581
    fn run_tcp_test_server_fail(server_ip: &str, server_port: uint,
B
Brian Anderson 已提交
1582
                          iotask: IoTask) -> TcpListenErrData {
1583
        let server_ip_addr = ip::v4::parse_addr(server_ip);
T
Tim Chevalier 已提交
1584 1585
        let listen_result = listen(move server_ip_addr, server_port, 128,
                                   iotask,
1586
            // on_establish_cb -- called when listener is set up
1587
            |kill_ch| {
P
Paul Stansifer 已提交
1588 1589
                log(debug, fmt!("establish_cb %?",
                    kill_ch));
1590
            },
1591
            |new_conn, kill_ch| {
P
Paul Stansifer 已提交
1592 1593
                fail fmt!("SERVER: shouldn't be called.. %? %?",
                           new_conn, kill_ch);
1594 1595
        });
        // err check on listen_result
B
Brian Anderson 已提交
1596 1597
        if result::is_err(&listen_result) {
            result::get_err(&listen_result)
1598 1599
        }
        else {
1600
            fail ~"SERVER: did not fail as expected"
1601 1602 1603
        }
    }

B
Brian Anderson 已提交
1604
    fn run_tcp_test_client(server_ip: &str, server_port: uint, resp: &str,
1605
                          client_ch: comm::Chan<~str>,
B
Brian Anderson 已提交
1606
                          iotask: IoTask) -> result::Result<~str,
B
Brian Anderson 已提交
1607
                                                    TcpConnectErrData> {
1608
        let server_ip_addr = ip::v4::parse_addr(server_ip);
1609

1610
        log(debug, ~"CLIENT: starting..");
T
Tim Chevalier 已提交
1611 1612
        let connect_result = connect(move server_ip_addr, server_port,
                                     iotask);
B
Brian Anderson 已提交
1613
        if result::is_err(&connect_result) {
1614
            log(debug, ~"CLIENT: failed to connect");
B
Brian Anderson 已提交
1615
            let err_data = result::get_err(&connect_result);
1616
            Err(err_data)
1617
        }
1618
        else {
1619
            let sock = result::unwrap(move connect_result);
1620
            let resp_bytes = str::to_bytes(resp);
B
Brian Anderson 已提交
1621
            tcp_write_single(&sock, resp_bytes);
1622
            let read_result = sock.read(0u);
1623
            if read_result.is_err() {
1624
                log(debug, ~"CLIENT: failure to read");
1625
                Ok(~"")
1626
            }
1627 1628 1629
            else {
                client_ch.send(str::from_bytes(read_result.get()));
                let ret_val = client_ch.recv();
P
Paul Stansifer 已提交
1630 1631
                log(debug, fmt!("CLIENT: after client_ch recv ret: '%s'",
                   ret_val));
1632
                Ok(ret_val)
1633 1634
            }
        }
1635 1636
    }

B
Brian Anderson 已提交
1637
    fn tcp_write_single(sock: &TcpSocket, val: ~[u8]) {
1638 1639
        let write_result_future = sock.write_future(val);
        let write_result = write_result_future.get();
B
Brian Anderson 已提交
1640
        if result::is_err(&write_result) {
1641
            log(debug, ~"tcp_write_single: write failed!");
B
Brian Anderson 已提交
1642
            let err_data = result::get_err(&write_result);
P
Paul Stansifer 已提交
1643 1644
            log(debug, fmt!("tcp_write_single err name: %s msg: %s",
                err_data.err_name, err_data.err_msg));
1645
            // meh. torn on what to do here.
1646
            fail ~"tcp_write_single failed";
1647
        }
1648
    }
1649
}