net_tcp.rs 58.8 KB
Newer Older
1
//! High-level interface to libuv's TCP functionality
2

J
Jeff Olson 已提交
3
import ip = net_ip;
4 5
import uv::iotask;
import uv::iotask::iotask;
6
import comm::methods;
7
import future_spawn = future::spawn;
8
import future::extensions;
J
Jeff Olson 已提交
9 10
// FIXME #1935
// should be able to, but can't atm, replace w/ result::{result, extensions};
11 12 13
import result::*;
import libc::size_t;
import str::extensions;
J
Jeff Olson 已提交
14

15
// tcp interfaces
16
export tcp_socket;
17 18
// buffered socket
export tcp_socket_buf, socket_buf;
19 20
// errors
export tcp_err_data, tcp_connect_err_data;
21
// operations on a tcp_socket
22
export write, write_future, read_start, read_stop;
23
// tcp server stuff
24
export listen, accept;
25 26
// tcp client stuff
export connect;
J
Jeff Olson 已提交
27

28
#[nolink]
29
extern mod rustrt {
30
    fn rust_uv_current_kernel_malloc(size: libc::c_uint) -> *libc::c_void;
31
    fn rust_uv_current_kernel_free(mem: *libc::c_void);
32
    fn rust_uv_helper_uv_tcp_t_size() -> libc::c_uint;
33 34
}

35 36 37 38 39 40 41
/**
 * Encapsulates an open TCP/IP connection through libuv
 *
 * `tcp_socket` is non-copyable/sendable and automagically handles closing the
 * underlying libuv data structures when it goes out of scope. This is the
 * data structure that is used for read/write operations over a TCP stream.
 */
42 43 44 45
class tcp_socket {
  let socket_data: @tcp_socket_data;
  new(socket_data: @tcp_socket_data) { self.socket_data = socket_data; }
  drop {
46
    unsafe {
47
        tear_down_socket_data(self.socket_data)
48 49
    }
  }
J
Jeff Olson 已提交
50 51
}

52 53 54 55 56 57
/**
 * A buffered wrapper for `net::tcp::tcp_socket`
 *
 * It is created with a call to `net::tcp::socket_buf()` and has impls that
 * satisfy both the `io::reader` and `io::writer` ifaces.
 */
58 59 60
class tcp_socket_buf {
  let data: @tcp_buffered_socket_data;
  new(data: @tcp_buffered_socket_data) { self.data = data; }
61 62
}

63
/// Contains raw, string-based, error information returned from libuv
64 65 66 67
type tcp_err_data = {
    err_name: str,
    err_msg: str
};
68
/// Details returned as part of a `result::err` result from `tcp::listen`
69
enum tcp_listen_err_data {
70 71 72 73
    /**
     * Some unplanned-for error. The first and second fields correspond
     * to libuv's `err_name` and `err_msg` fields, respectively.
     */
74
    generic_listen_err(str, str),
75 76 77 78 79 80 81
    /**
     * Failed to bind to the requested IP/Port, because it is already in use.
     *
     * # Possible Causes
     *
     * * Attempting to bind to a port already bound to another listener
     */
82
    address_in_use,
83 84 85 86 87 88 89 90 91 92
    /**
     * Request to bind to an IP/Port was denied by the system.
     *
     * # Possible Causes
     *
     * * Attemping to binding to an IP/Port as a non-Administrator
     *   on Windows Vista+
     * * Attempting to bind, as a non-priv'd
     *   user, to 'privileged' ports (< 1024) on *nix
     */
93 94
    access_denied
}
95
/// Details returned as part of a `result::err` result from `tcp::connect`
96
enum tcp_connect_err_data {
97 98 99 100
    /**
     * Some unplanned-for error. The first and second fields correspond
     * to libuv's `err_name` and `err_msg` fields, respectively.
     */
101
    generic_connect_err(str, str),
102
    /// Invalid IP or invalid port
103 104
    connection_refused
}
105

106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
/**
 * Initiate a client connection over TCP/IP
 *
 * # Arguments
 *
 * * `input_ip` - The IP address (versions 4 or 6) of the remote host
 * * `port` - the unsigned integer of the desired remote host port
 * * `iotask` - a `uv::iotask` that the tcp request will run on
 *
 * # Returns
 *
 * A `result` that, if the operation succeeds, contains a
 * `net::net::tcp_socket` that can be used to send and receive data to/from
 * the remote host. In the event of failure, a
 * `net::tcp::tcp_connect_err_data` instance will be returned
 */
122
fn connect(-input_ip: ip::ip_addr, port: uint,
123
           iotask: iotask)
124
    -> result::result<tcp_socket, tcp_connect_err_data> unsafe {
J
Jeff Olson 已提交
125 126 127 128 129 130 131
    let result_po = comm::port::<conn_attempt>();
    let closed_signal_po = comm::port::<()>();
    let conn_data = {
        result_ch: comm::chan(result_po),
        closed_signal_ch: comm::chan(closed_signal_po)
    };
    let conn_data_ptr = ptr::addr_of(conn_data);
132
    let reader_po = comm::port::<result::result<~[u8], tcp_err_data>>();
133 134 135
    let stream_handle_ptr = malloc_uv_tcp_t();
    *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
    let socket_data = @{
136 137
        reader_po: reader_po,
        reader_ch: comm::chan(reader_po),
138 139 140
        stream_handle_ptr: stream_handle_ptr,
        connect_req: uv::ll::connect_t(),
        write_req: uv::ll::write_t(),
141
        iotask: iotask
J
Jeff Olson 已提交
142
    };
143
    let socket_data_ptr = ptr::addr_of(*socket_data);
J
Jeff Olson 已提交
144 145 146
    log(debug, #fmt("tcp_connect result_ch %?", conn_data.result_ch));
    // get an unsafe representation of our stream_handle_ptr that
    // we can send into the interact cb to be handled in libuv..
147
    log(debug, #fmt("stream_handle_ptr outside interact %?",
148
        stream_handle_ptr));
B
Brian Anderson 已提交
149
    do iotask::interact(iotask) |loop_ptr| {
J
Jeff Olson 已提交
150
        log(debug, "in interact cb for tcp client connect..");
151
        log(debug, #fmt("stream_handle_ptr in interact %?",
J
Jeff Olson 已提交
152 153 154 155 156 157 158 159 160
            stream_handle_ptr));
        alt uv::ll::tcp_init( loop_ptr, stream_handle_ptr) {
          0i32 {
            log(debug, "tcp_init successful");
            alt input_ip {
              ipv4 {
                log(debug, "dealing w/ ipv4 connection..");
                let connect_req_ptr =
                    ptr::addr_of((*socket_data_ptr).connect_req);
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
                let addr_str = ip::format_addr(input_ip);
                let connect_result = alt input_ip {
                  ip::ipv4(addr) {
                    // have to "recreate" the sockaddr_in/6
                    // since the ip_addr discards the port
                    // info.. should probably add an additional
                    // rust type that actually is closer to
                    // what the libuv API expects (ip str + port num)
                    log(debug, #fmt("addr: %?", addr));
                    let in_addr = uv::ll::ip4_addr(addr_str, port as int);
                    uv::ll::tcp_connect(
                        connect_req_ptr,
                        stream_handle_ptr,
                        ptr::addr_of(in_addr),
                        tcp_connect_on_connect_cb)
                  }
                  ip::ipv6(addr) {
                    log(debug, #fmt("addr: %?", addr));
                    let in_addr = uv::ll::ip6_addr(addr_str, port as int);
                    uv::ll::tcp_connect6(
                        connect_req_ptr,
                        stream_handle_ptr,
                        ptr::addr_of(in_addr),
                        tcp_connect_on_connect_cb)
                  }
                };
                alt connect_result {
J
Jeff Olson 已提交
188 189 190 191 192
                  0i32 {
                    log(debug, "tcp_connect successful");
                    // reusable data that we'll have for the
                    // duration..
                    uv::ll::set_data_for_uv_handle(stream_handle_ptr,
193 194
                                               socket_data_ptr as
                                                  *libc::c_void);
J
Jeff Olson 已提交
195 196 197 198 199 200 201 202 203 204 205 206 207
                    // just so the connect_cb can send the
                    // outcome..
                    uv::ll::set_data_for_req(connect_req_ptr,
                                             conn_data_ptr);
                    log(debug, "leaving tcp_connect interact cb...");
                    // let tcp_connect_on_connect_cb send on
                    // the result_ch, now..
                  }
                  _ {
                    // immediate connect failure.. probably a garbage
                    // ip or somesuch
                    let err_data = uv::ll::get_last_err_data(loop_ptr);
                    comm::send((*conn_data_ptr).result_ch,
208
                               conn_failure(err_data.to_tcp_err()));
J
Jeff Olson 已提交
209 210 211 212 213 214 215 216 217 218 219 220
                    uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                                   conn_data_ptr);
                    uv::ll::close(stream_handle_ptr, stream_error_close_cb);
                  }
                }
              }
            }
        }
          _ {
            // failure to create a tcp handle
            let err_data = uv::ll::get_last_err_data(loop_ptr);
            comm::send((*conn_data_ptr).result_ch,
221
                       conn_failure(err_data.to_tcp_err()));
J
Jeff Olson 已提交
222 223 224 225 226 227
          }
        }
    };
    alt comm::recv(result_po) {
      conn_success {
        log(debug, "tcp::connect - received success on result_po");
228
        result::ok(tcp_socket(socket_data))
J
Jeff Olson 已提交
229 230 231 232
      }
      conn_failure(err_data) {
        comm::recv(closed_signal_po);
        log(debug, "tcp::connect - received failure on result_po");
233 234 235 236 237 238 239 240
        // still have to free the malloc'd stream handle..
        rustrt::rust_uv_current_kernel_free(stream_handle_ptr
                                           as *libc::c_void);
        let tcp_conn_err = alt err_data.err_name {
          "ECONNREFUSED" { connection_refused }
          _ { generic_connect_err(err_data.err_name, err_data.err_msg) }
        };
        result::err(tcp_conn_err)
J
Jeff Olson 已提交
241 242 243
      }
    }
}
244

245 246 247 248 249 250
/**
 * Write binary data to a tcp stream; Blocks until operation completes
 *
 * # Arguments
 *
 * * sock - a `tcp_socket` to write to
251
 * * raw_write_data - a vector of `~[u8]` that will be written to the stream.
252 253 254 255 256 257 258
 * This value must remain valid for the duration of the `write` call
 *
 * # Returns
 *
 * A `result` object with a `nil` value as the `ok` variant, or a
 * `tcp_err_data` value as the `err` variant
 */
259
fn write(sock: tcp_socket, raw_write_data: ~[u8])
260
    -> result::result<(), tcp_err_data> unsafe {
261
    let socket_data_ptr = ptr::addr_of(*(sock.socket_data));
262 263 264
    write_common_impl(socket_data_ptr, raw_write_data)
}

265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
/**
 * Write binary data to tcp stream; Returns a `future::future` value
 * immediately
 *
 * # Safety
 *
 * This function can produce unsafe results if:
 *
 * 1. the call to `write_future` is made
 * 2. the `future::future` value returned is never resolved via
 * `future::get`
 * 3. and then the `tcp_socket` passed in to `write_future` leaves
 * scope and is destructed before the task that runs the libuv write
 * operation completes.
 *
 * As such: If using `write_future`, always be sure to resolve the returned
 * `future` so as to ensure libuv doesn't try to access a released write
 * handle. Otherwise, use the blocking `tcp::write` function instead.
 *
 * # Arguments
 *
 * * sock - a `tcp_socket` to write to
287
 * * raw_write_data - a vector of `~[u8]` that will be written to the stream.
288 289 290 291 292 293 294 295
 * This value must remain valid for the duration of the `write` call
 *
 * # Returns
 *
 * A `future` value that, once the `write` operation completes, resolves to a
 * `result` object with a `nil` value as the `ok` variant, or a `tcp_err_data`
 * value as the `err` variant
 */
296
fn write_future(sock: tcp_socket, raw_write_data: ~[u8])
297
    -> future::future<result::result<(), tcp_err_data>> unsafe {
298
    let socket_data_ptr = ptr::addr_of(*(sock.socket_data));
299
    do future_spawn {
300
        let data_copy = copy(raw_write_data);
301
        write_common_impl(socket_data_ptr, data_copy)
302
    }
303 304
}

305 306 307 308 309 310 311 312 313 314 315 316 317 318
/**
 * Begin reading binary data from an open TCP connection; used with
 * `read_stop`
 *
 * # Arguments
 *
 * * sock -- a `net::tcp::tcp_socket` for the connection to read from
 *
 * # Returns
 *
 * * A `result` instance that will either contain a
 * `comm::port<tcp_read_result>` that the user can read (and optionally, loop
 * on) from until `read_stop` is called, or a `tcp_err_data` record
 */
319 320
fn read_start(sock: tcp_socket)
    -> result::result<comm::port<
321
        result::result<~[u8], tcp_err_data>>, tcp_err_data> unsafe {
322
    let socket_data = ptr::addr_of(*(sock.socket_data));
323
    read_start_common_impl(socket_data)
324
}
325

326 327 328 329 330 331 332
/**
 * Stop reading from an open TCP connection; used with `read_start`
 *
 * # Arguments
 *
 * * `sock` - a `net::tcp::tcp_socket` that you wish to stop reading on
 */
333
fn read_stop(sock: tcp_socket,
334
             -read_port: comm::port<result::result<~[u8], tcp_err_data>>) ->
335
    result::result<(), tcp_err_data> unsafe {
336
    log(debug, #fmt("taking the read_port out of commission %?", read_port));
337
    let socket_data = ptr::addr_of(*sock.socket_data);
338 339 340
    read_stop_common_impl(socket_data)
}

341 342 343 344 345 346 347 348 349 350 351 352 353 354 355
/**
 * Reads a single chunk of data from `tcp_socket`; block until data/error
 * recv'd
 *
 * Does a blocking read operation for a single chunk of data from a
 * `tcp_socket` until a data arrives or an error is received. The provided
 * `timeout_msecs` value is used to raise an error if the timeout period
 * passes without any data received.
 *
 * # Arguments
 *
 * * `sock` - a `net::tcp::tcp_socket` that you wish to read from
 * * `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
 * read attempt. Pass `0u` to wait indefinitely
 */
356
fn read(sock: tcp_socket, timeout_msecs: uint)
357
    -> result::result<~[u8],tcp_err_data> {
358
    let socket_data = ptr::addr_of(*(sock.socket_data));
359 360 361
    read_common_impl(socket_data, timeout_msecs)
}

362
/**
363
 * Reads a single chunk of data; returns a `future::future<~[u8]>`
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390
 * immediately
 *
 * Does a non-blocking read operation for a single chunk of data from a
 * `tcp_socket` and immediately returns a `future` value representing the
 * result. When resolving the returned `future`, it will block until data
 * arrives or an error is received. The provided `timeout_msecs`
 * value is used to raise an error if the timeout period passes without any
 * data received.
 *
 * # Safety
 *
 * This function can produce unsafe results if the call to `read_future` is
 * made, the `future::future` value returned is never resolved via
 * `future::get`, and then the `tcp_socket` passed in to `read_future` leaves
 * scope and is destructed before the task that runs the libuv read
 * operation completes.
 *
 * As such: If using `read_future`, always be sure to resolve the returned
 * `future` so as to ensure libuv doesn't try to access a released read
 * handle. Otherwise, use the blocking `tcp::read` function instead.
 *
 * # Arguments
 *
 * * `sock` - a `net::tcp::tcp_socket` that you wish to read from
 * * `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
 * read attempt. Pass `0u` to wait indefinitely
 */
391
fn read_future(sock: tcp_socket, timeout_msecs: uint)
392
    -> future::future<result::result<~[u8],tcp_err_data>> {
393
    let socket_data = ptr::addr_of(*(sock.socket_data));
394
    do future_spawn {
395
        read_common_impl(socket_data, timeout_msecs)
396
    }
397
}
398

399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467
/**
 * Bind an incoming client connection to a `net::tcp::tcp_socket`
 *
 * # Notes
 *
 * It is safe to call `net::tcp::accept` _only_ within the context of the
 * `new_connect_cb` callback provided as the final argument to the
 * `net::tcp::listen` function.
 *
 * The `new_conn` opaque value is provided _only_ as the first argument to the
 * `new_connect_cb` provided as a part of `net::tcp::listen`.
 * It can be safely sent to another task but it _must_ be
 * used (via `net::tcp::accept`) before the `new_connect_cb` call it was
 * provided to returns.
 *
 * This implies that a port/chan pair must be used to make sure that the
 * `new_connect_cb` call blocks until an attempt to create a
 * `net::tcp::tcp_socket` is completed.
 *
 * # Example
 *
 * Here, the `new_conn` is used in conjunction with `accept` from within
 * a task spawned by the `new_connect_cb` passed into `listen`
 *
 * ~~~~~~~~~~~
 * net::tcp::listen(remote_ip, remote_port, backlog)
 *     // this callback is ran once after the connection is successfully
 *     // set up
 *     {|kill_ch|
 *       // pass the kill_ch to your main loop or wherever you want
 *       // to be able to externally kill the server from
 *     }
 *     // this callback is ran when a new connection arrives
 *     {|new_conn, kill_ch|
 *     let cont_po = comm::port::<option<tcp_err_data>>();
 *     let cont_ch = comm::chan(cont_po);
 *     task::spawn {||
 *         let accept_result = net::tcp::accept(new_conn);
 *         if accept_result.is_err() {
 *             comm::send(cont_ch, result::get_err(accept_result));
 *             // fail?
 *         }
 *         else {
 *             let sock = result::get(accept_result);
 *             comm::send(cont_ch, true);
 *             // do work here
 *         }
 *     };
 *     alt comm::recv(cont_po) {
 *       // shut down listen()
 *       some(err_data) { comm::send(kill_chan, some(err_data)) }
 *       // wait for next connection
 *       none {}
 *     }
 * };
 * ~~~~~~~~~~~
 *
 * # Arguments
 *
 * * `new_conn` - an opaque value used to create a new `tcp_socket`
 *
 * # Returns
 *
 * On success, this function will return a `net::tcp::tcp_socket` as the
 * `ok` variant of a `result`. The `net::tcp::tcp_socket` is anchored within
 * the task that `accept` was called within for its lifetime. On failure,
 * this function will return a `net::tcp::tcp_err_data` record
 * as the `err` variant of a `result`.
 */
468 469 470 471 472 473
fn accept(new_conn: tcp_new_connection)
    -> result::result<tcp_socket, tcp_err_data> unsafe {

    alt new_conn{
      new_tcp_conn(server_handle_ptr) {
        let server_data_ptr = uv::ll::get_data_for_uv_handle(
474
            server_handle_ptr) as *tcp_listen_fc_data;
475
        let reader_po = comm::port::<result::result<~[u8], tcp_err_data>>();
476
        let iotask = (*server_data_ptr).iotask;
477 478 479
        let stream_handle_ptr = malloc_uv_tcp_t();
        *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
        let client_socket_data = @{
480 481
            reader_po: reader_po,
            reader_ch: comm::chan(reader_po),
482
            stream_handle_ptr : stream_handle_ptr,
483 484
            connect_req : uv::ll::connect_t(),
            write_req : uv::ll::write_t(),
485
            iotask : iotask
486
        };
487 488 489
        let client_socket_data_ptr = ptr::addr_of(*client_socket_data);
        let client_stream_handle_ptr =
            (*client_socket_data_ptr).stream_handle_ptr;
490 491 492

        let result_po = comm::port::<option<tcp_err_data>>();
        let result_ch = comm::chan(result_po);
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508

        // UNSAFE LIBUV INTERACTION BEGIN
        // .. normally this happens within the context of
        // a call to uv::hl::interact.. but we're breaking
        // the rules here because this always has to be
        // called within the context of a listen() new_connect_cb
        // callback (or it will likely fail and drown your cat)
        log(debug, "in interact cb for tcp::accept");
        let loop_ptr = uv::ll::get_loop_for_uv_handle(
            server_handle_ptr);
        alt uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
          0i32 {
            log(debug, "uv_tcp_init successful for client stream");
            alt uv::ll::accept(
                server_handle_ptr as *libc::c_void,
                client_stream_handle_ptr as *libc::c_void) {
509
              0i32 {
510 511
                log(debug, "successfully accepted client connection");
                uv::ll::set_data_for_uv_handle(client_stream_handle_ptr,
512 513
                                               client_socket_data_ptr
                                                   as *libc::c_void);
514
                comm::send(result_ch, none);
515 516
              }
              _ {
517
                log(debug, "failed to accept client conn");
518 519 520 521
                comm::send(result_ch, some(
                    uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
              }
            }
522 523 524 525 526 527 528 529
          }
          _ {
            log(debug, "failed to init client stream");
            comm::send(result_ch, some(
                uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
          }
        }
        // UNSAFE LIBUV INTERACTION END
530 531 532 533 534
        alt comm::recv(result_po) {
          some(err_data) {
            result::err(err_data)
          }
          none {
535
            result::ok(tcp_socket(client_socket_data))
536 537 538 539 540 541
          }
        }
      }
    }
}

542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569
/**
 * Bind to a given IP/port and listen for new connections
 *
 * # Arguments
 *
 * * `host_ip` - a `net::ip::ip_addr` representing a unique IP
 * (versions 4 or 6)
 * * `port` - a uint representing the port to listen on
 * * `backlog` - a uint representing the number of incoming connections
 * to cache in memory
 * * `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on
 * * `on_establish_cb` - a callback that is evaluated if/when the listener
 * is successfully established. it takes no parameters
 * * `new_connect_cb` - a callback to be evaluated, on the libuv thread,
 * whenever a client attempts to conect on the provided ip/port. the
 * callback's arguments are:
 *     * `new_conn` - an opaque type that can be passed to
 *     `net::tcp::accept` in order to be converted to a `tcp_socket`.
 *     * `kill_ch` - channel of type `comm::chan<option<tcp_err_data>>`. this
 *     channel can be used to send a message to cause `listen` to begin
 *     closing the underlying libuv data structures.
 *
 * # returns
 *
 * a `result` instance containing empty data of type `()` on a
 * successful/normal shutdown, and a `tcp_listen_err_data` enum in the event
 * of listen exiting because of an error
 */
570
fn listen(-host_ip: ip::ip_addr, port: uint, backlog: uint,
571
          iotask: iotask,
572
          on_establish_cb: fn~(comm::chan<option<tcp_err_data>>),
573 574
          +new_connect_cb: fn~(tcp_new_connection,
                               comm::chan<option<tcp_err_data>>))
575
    -> result::result<(), tcp_listen_err_data> unsafe {
576
    do listen_common(host_ip, port, backlog, iotask, on_establish_cb)
577
        // on_connect_cb
578
        |handle| {
579 580 581 582 583 584 585 586
            let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
                as *tcp_listen_fc_data;
            let new_conn = new_tcp_conn(handle);
            let kill_ch = (*server_data_ptr).kill_ch;
            new_connect_cb(new_conn, kill_ch);
    }
}

587
fn listen_common(-host_ip: ip::ip_addr, port: uint, backlog: uint,
588 589 590
          iotask: iotask,
          on_establish_cb: fn~(comm::chan<option<tcp_err_data>>),
          -on_connect_cb: fn~(*uv::ll::uv_tcp_t))
591
    -> result::result<(), tcp_listen_err_data> unsafe {
592 593 594 595 596 597 598 599 600
    let stream_closed_po = comm::port::<()>();
    let kill_po = comm::port::<option<tcp_err_data>>();
    let kill_ch = comm::chan(kill_po);
    let server_stream = uv::ll::tcp_t();
    let server_stream_ptr = ptr::addr_of(server_stream);
    let server_data = {
        server_stream_ptr: server_stream_ptr,
        stream_closed_ch: comm::chan(stream_closed_po),
        kill_ch: kill_ch,
601
        on_connect_cb: on_connect_cb,
602
        iotask: iotask,
603 604 605 606
        mut active: true
    };
    let server_data_ptr = ptr::addr_of(server_data);

607
    let setup_result = do comm::listen |setup_ch| {
J
Jeff Olson 已提交
608
        // this is to address a compiler warning about
609 610 611 612 613 614
        // an implicit copy.. it seems that double nested
        // will defeat a move sigil, as is done to the host_ip
        // arg above.. this same pattern works w/o complaint in
        // tcp::connect (because the iotask::interact cb isn't
        // nested within a comm::listen block)
        let loc_ip = copy(host_ip);
615
        do iotask::interact(iotask) |loop_ptr| {
616
            alt uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
617
              0i32 {
618 619 620
                uv::ll::set_data_for_uv_handle(
                    server_stream_ptr,
                    server_data_ptr);
621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636
                let addr_str = ip::format_addr(loc_ip);
                let bind_result = alt loc_ip {
                  ip::ipv4(addr) {
                    log(debug, #fmt("addr: %?", addr));
                    let in_addr = uv::ll::ip4_addr(addr_str, port as int);
                    uv::ll::tcp_bind(server_stream_ptr,
                                     ptr::addr_of(in_addr))
                  }
                  ip::ipv6(addr) {
                    log(debug, #fmt("addr: %?", addr));
                    let in_addr = uv::ll::ip6_addr(addr_str, port as int);
                    uv::ll::tcp_bind6(server_stream_ptr,
                                     ptr::addr_of(in_addr))
                  }
                };
                alt bind_result {
637
                  0i32 {
638 639 640 641 642 643 644 645 646 647 648 649
                    alt uv::ll::listen(server_stream_ptr,
                                       backlog as libc::c_int,
                                       tcp_lfc_on_connection_cb) {
                      0i32 {
                        comm::send(setup_ch, none);
                      }
                      _ {
                        log(debug, "failure to uv_listen()");
                        let err_data = uv::ll::get_last_err_data(loop_ptr);
                        comm::send(setup_ch, some(err_data));
                      }
                    }
650 651
                  }
                  _ {
652
                    log(debug, "failure to uv_tcp_bind");
653 654 655 656 657 658
                    let err_data = uv::ll::get_last_err_data(loop_ptr);
                    comm::send(setup_ch, some(err_data));
                  }
                }
              }
              _ {
659
                log(debug, "failure to uv_tcp_init");
660 661 662 663
                let err_data = uv::ll::get_last_err_data(loop_ptr);
                comm::send(setup_ch, some(err_data));
              }
            }
664 665
        };
        setup_ch.recv()
666
    };
667
    alt setup_result {
668
      some(err_data) {
669
        do iotask::interact(iotask) |loop_ptr| {
670 671 672 673 674 675 676 677 678 679 680 681 682 683
            log(debug, #fmt("tcp::listen post-kill recv hl interact %?",
                            loop_ptr));
            (*server_data_ptr).active = false;
            uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
        };
        stream_closed_po.recv();
        alt err_data.err_name {
          "EACCES" {
            log(debug, "Got EACCES error");
            result::err(access_denied)
          }
          "EADDRINUSE" {
            log(debug, "Got EADDRINUSE error");
            result::err(address_in_use)
684 685
          }
          _ {
686 687 688 689
            log(debug, #fmt("Got '%s' '%s' libuv error",
                            err_data.err_name, err_data.err_msg));
            result::err(
                generic_listen_err(err_data.err_name, err_data.err_msg))
690 691 692 693 694
          }
        }
      }
      none {
        on_establish_cb(kill_ch);
N
Niko Matsakis 已提交
695
        let kill_result = comm::recv(kill_po);
B
Brian Anderson 已提交
696
        do iotask::interact(iotask) |loop_ptr| {
697 698 699 700 701
            log(debug, #fmt("tcp::listen post-kill recv hl interact %?",
                            loop_ptr));
            (*server_data_ptr).active = false;
            uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
        };
702
        stream_closed_po.recv();
703 704 705
        alt kill_result {
          // some failure post bind/listen
          some(err_data) {
706 707
            result::err(generic_listen_err(err_data.err_name,
                                            err_data.err_msg))
708 709 710 711 712 713 714 715 716 717
          }
          // clean exit
          none {
            result::ok(())
          }
        }
      }
    }
}

718 719 720 721 722 723 724 725 726 727 728 729 730 731 732
/**
 * Convert a `net::tcp::tcp_socket` to a `net::tcp::tcp_socket_buf`.
 *
 * This function takes ownership of a `net::tcp::tcp_socket`, returning it
 * stored within a buffered wrapper, which can be converted to a `io::reader`
 * or `io::writer`
 *
 * # Arguments
 *
 * * `sock` -- a `net::tcp::tcp_socket` that you want to buffer
 *
 * # Returns
 *
 * A buffered wrapper that you can cast as an `io::reader` or `io::writer`
 */
733
fn socket_buf(-sock: tcp_socket) -> tcp_socket_buf {
734
    tcp_socket_buf(@{ sock: sock, mut buf: ~[] })
735 736
}

737
/// Convenience methods extending `net::tcp::tcp_socket`
738
impl tcp_socket for tcp_socket {
739
    fn read_start() -> result::result<comm::port<
740
        result::result<~[u8], tcp_err_data>>, tcp_err_data> {
741 742
        read_start(self)
    }
743
    fn read_stop(-read_port:
744
                 comm::port<result::result<~[u8], tcp_err_data>>) ->
745
        result::result<(), tcp_err_data> {
746
        read_stop(self, read_port)
747
    }
748
    fn read(timeout_msecs: uint) ->
749
        result::result<~[u8], tcp_err_data> {
750 751 752
        read(self, timeout_msecs)
    }
    fn read_future(timeout_msecs: uint) ->
753
        future::future<result::result<~[u8], tcp_err_data>> {
754 755
        read_future(self, timeout_msecs)
    }
756
    fn write(raw_write_data: ~[u8])
757 758 759
        -> result::result<(), tcp_err_data> {
        write(self, raw_write_data)
    }
760
    fn write_future(raw_write_data: ~[u8])
761
        -> future::future<result::result<(), tcp_err_data>> {
762 763
        write_future(self, raw_write_data)
    }
764
}
765

766
/// Implementation of `io::reader` iface for a buffered `net::tcp::tcp_socket`
767
impl tcp_socket_buf of io::reader for @tcp_socket_buf {
768
    fn read_bytes(amt: uint) -> ~[u8] {
769
        let has_amt_available =
770
            vec::len((*(self.data)).buf) >= amt;
771 772
        if has_amt_available {
            // no arbitrary-length shift in vec::?
773
            let mut ret_buf = ~[];
774
            while vec::len(ret_buf) < amt {
775
                ret_buf += ~[vec::shift((*(self.data)).buf)];
776 777 778 779
            }
            ret_buf
        }
        else {
780
            let read_result = read((*(self.data)).sock, 0u);
781
            if read_result.is_err() {
782 783 784
                let err_data = read_result.get_err();
                log(debug, #fmt("ERROR sock_buf as io::reader.read err %? %?",
                                 err_data.err_name, err_data.err_msg));
785
                ~[]
786 787 788
            }
            else {
                let new_chunk = result::unwrap(read_result);
789
                (*(self.data)).buf += new_chunk;
790 791 792 793 794 795 796 797
                self.read_bytes(amt)
            }
        }
    }
    fn read_byte() -> int {
        self.read_bytes(1u)[0] as int
    }
    fn unread_byte(amt: int) {
798
        vec::unshift((*(self.data)).buf, amt as u8);
799 800 801 802 803 804 805 806 807 808 809 810 811
    }
    fn eof() -> bool {
        false // noop
    }
    fn seek(dist: int, seek: io::seek_style) {
        log(debug, #fmt("tcp_socket_buf seek stub %? %?", dist, seek));
        // noop
    }
    fn tell() -> uint {
        0u // noop
    }
}

812
/// Implementation of `io::reader` iface for a buffered `net::tcp::tcp_socket`
813
impl tcp_socket_buf of io::writer for @tcp_socket_buf {
814
    fn write(data: &[const u8]) unsafe {
815 816
        let socket_data_ptr =
            ptr::addr_of(*((*(self.data)).sock).socket_data);
817 818
        let w_result = write_common_impl(socket_data_ptr,
                                        vec::slice(data, 0, vec::len(data)));
819
        if w_result.is_err() {
820 821 822 823
            let err_data = w_result.get_err();
            log(debug, #fmt("ERROR sock_buf as io::writer.writer err: %? %?",
                             err_data.err_name, err_data.err_msg));
        }
824 825 826 827 828 829 830 831 832 833 834 835 836
    }
    fn seek(dist: int, seek: io::seek_style) {
      log(debug, #fmt("tcp_socket_buf seek stub %? %?", dist, seek));
        // noop
    }
    fn tell() -> uint {
        0u
    }
    fn flush() -> int {
        0
    }
}

837 838
// INTERNAL API

839 840 841 842 843 844 845 846
fn tear_down_socket_data(socket_data: @tcp_socket_data) unsafe {
    let closed_po = comm::port::<()>();
    let closed_ch = comm::chan(closed_po);
    let close_data = {
        closed_ch: closed_ch
    };
    let close_data_ptr = ptr::addr_of(close_data);
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
847
    do iotask::interact((*socket_data).iotask) |loop_ptr| {
848 849 850 851 852 853 854 855 856 857 858 859 860
        log(debug, #fmt("interact dtor for tcp_socket stream %? loop %?",
            stream_handle_ptr, loop_ptr));
        uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                       close_data_ptr);
        uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb);
    };
    comm::recv(closed_po);
    log(debug, #fmt("about to free socket_data at %?", socket_data));
    rustrt::rust_uv_current_kernel_free(stream_handle_ptr
                                       as *libc::c_void);
    log(debug, "exiting dtor for tcp_socket");
}

861 862
// shared implementation for tcp::read
fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint)
863
    -> result::result<~[u8],tcp_err_data> unsafe {
864
    log(debug, "starting tcp::read");
865
    let iotask = (*socket_data).iotask;
866
    let rs_result = read_start_common_impl(socket_data);
867
    if result::is_err(rs_result) {
868 869 870 871 872 873 874
        let err_data = result::get_err(rs_result);
        result::err(err_data)
    }
    else {
        log(debug, "tcp::read before recv_timeout");
        let read_result = if timeout_msecs > 0u {
            timer::recv_timeout(
875
               iotask, timeout_msecs, result::get(rs_result))
876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904
        } else {
            some(comm::recv(result::get(rs_result)))
        };
        log(debug, "tcp::read after recv_timeout");
        alt read_result {
          none {
            log(debug, "tcp::read: timed out..");
            let err_data = {
                err_name: "TIMEOUT",
                err_msg: "req timed out"
            };
            read_stop_common_impl(socket_data);
            result::err(err_data)
          }
          some(data_result) {
            log(debug, "tcp::read got data");
            read_stop_common_impl(socket_data);
            data_result
          }
        }
    }
}

// shared impl for read_stop
fn read_stop_common_impl(socket_data: *tcp_socket_data) ->
    result::result<(), tcp_err_data> unsafe {
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
    let stop_po = comm::port::<option<tcp_err_data>>();
    let stop_ch = comm::chan(stop_po);
B
Brian Anderson 已提交
905
    do iotask::interact((*socket_data).iotask) |loop_ptr| {
906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931
        log(debug, "in interact cb for tcp::read_stop");
        alt uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
          0i32 {
            log(debug, "successfully called uv_read_stop");
            comm::send(stop_ch, none);
          }
          _ {
            log(debug, "failure in calling uv_read_stop");
            let err_data = uv::ll::get_last_err_data(loop_ptr);
            comm::send(stop_ch, some(err_data.to_tcp_err()));
          }
        }
    };
    alt comm::recv(stop_po) {
      some(err_data) {
        result::err(err_data.to_tcp_err())
      }
      none {
        result::ok(())
      }
    }
}

// shared impl for read_start
fn read_start_common_impl(socket_data: *tcp_socket_data)
    -> result::result<comm::port<
932
        result::result<~[u8], tcp_err_data>>, tcp_err_data> unsafe {
933 934 935 936
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
    let start_po = comm::port::<option<uv::ll::uv_err_data>>();
    let start_ch = comm::chan(start_po);
    log(debug, "in tcp::read_start before interact loop");
B
Brian Anderson 已提交
937
    do iotask::interact((*socket_data).iotask) |loop_ptr| {
938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962
        log(debug, #fmt("in tcp::read_start interact cb %?", loop_ptr));
        alt uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
                               on_alloc_cb,
                               on_tcp_read_cb) {
          0i32 {
            log(debug, "success doing uv_read_start");
            comm::send(start_ch, none);
          }
          _ {
            log(debug, "error attempting uv_read_start");
            let err_data = uv::ll::get_last_err_data(loop_ptr);
            comm::send(start_ch, some(err_data));
          }
        }
    };
    alt comm::recv(start_po) {
      some(err_data) {
        result::err(err_data.to_tcp_err())
      }
      none {
        result::ok((*socket_data).reader_po)
      }
    }
}

963 964
// helper to convert a "class" vector of [u8] to a *[uv::ll::uv_buf_t]

965 966
// shared implementation used by write and write_future
fn write_common_impl(socket_data_ptr: *tcp_socket_data,
967
                     raw_write_data: ~[u8])
968 969 970 971
    -> result::result<(), tcp_err_data> unsafe {
    let write_req_ptr = ptr::addr_of((*socket_data_ptr).write_req);
    let stream_handle_ptr =
        (*socket_data_ptr).stream_handle_ptr;
972
    let write_buf_vec =  ~[ uv::ll::buf_init(
973
        vec::unsafe::to_ptr(raw_write_data),
974
        vec::len(raw_write_data)) ];
975 976 977 978 979 980
    let write_buf_vec_ptr = ptr::addr_of(write_buf_vec);
    let result_po = comm::port::<tcp_write_result>();
    let write_data = {
        result_ch: comm::chan(result_po)
    };
    let write_data_ptr = ptr::addr_of(write_data);
B
Brian Anderson 已提交
981
    do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| {
982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998
        log(debug, #fmt("in interact cb for tcp::write %?", loop_ptr));
        alt uv::ll::write(write_req_ptr,
                          stream_handle_ptr,
                          write_buf_vec_ptr,
                          tcp_write_complete_cb) {
          0i32 {
            log(debug, "uv_write() invoked successfully");
            uv::ll::set_data_for_req(write_req_ptr, write_data_ptr);
          }
          _ {
            log(debug, "error invoking uv_write()");
            let err_data = uv::ll::get_last_err_data(loop_ptr);
            comm::send((*write_data_ptr).result_ch,
                       tcp_write_error(err_data.to_tcp_err()));
          }
        }
    };
999 1000 1001 1002
    // FIXME (#2656): Instead of passing unsafe pointers to local data,
    // and waiting here for the write to complete, we should transfer
    // ownership of everything to the I/O task and let it deal with the
    // aftermath, so we don't have to sit here blocking.
1003 1004 1005 1006 1007 1008
    alt comm::recv(result_po) {
      tcp_write_success { result::ok(()) }
      tcp_write_error(err_data) { result::err(err_data.to_tcp_err()) }
    }
}

1009 1010 1011 1012
enum tcp_new_connection {
    new_tcp_conn(*uv::ll::uv_tcp_t)
}

1013
type tcp_listen_fc_data = {
1014 1015 1016
    server_stream_ptr: *uv::ll::uv_tcp_t,
    stream_closed_ch: comm::chan<()>,
    kill_ch: comm::chan<option<tcp_err_data>>,
1017
    on_connect_cb: fn~(*uv::ll::uv_tcp_t),
1018
    iotask: iotask,
1019 1020 1021
    mut active: bool
};

G
Graydon Hoare 已提交
1022
extern fn tcp_lfc_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
1023
    let server_data_ptr = uv::ll::get_data_for_uv_handle(
1024
        handle) as *tcp_listen_fc_data;
1025 1026 1027
    comm::send((*server_data_ptr).stream_closed_ch, ());
}

G
Graydon Hoare 已提交
1028
extern fn tcp_lfc_on_connection_cb(handle: *uv::ll::uv_tcp_t,
1029 1030
                                     status: libc::c_int) unsafe {
    let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
1031
        as *tcp_listen_fc_data;
1032
    let kill_ch = (*server_data_ptr).kill_ch;
1033
    if (*server_data_ptr).active {
1034 1035
        alt status {
          0i32 {
1036
            (*server_data_ptr).on_connect_cb(handle);
1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048
          }
          _ {
            let loop_ptr = uv::ll::get_loop_for_uv_handle(handle);
            comm::send(kill_ch,
                       some(uv::ll::get_last_err_data(loop_ptr)
                            .to_tcp_err()));
            (*server_data_ptr).active = false;
          }
        }
    }
}

1049
fn malloc_uv_tcp_t() -> *uv::ll::uv_tcp_t unsafe {
1050
    rustrt::rust_uv_current_kernel_malloc(
1051
        rustrt::rust_uv_helper_uv_tcp_t_size()) as *uv::ll::uv_tcp_t
1052 1053
}

1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069
enum tcp_connect_result {
    tcp_connected(tcp_socket),
    tcp_connect_error(tcp_err_data)
}

enum tcp_write_result {
    tcp_write_success,
    tcp_write_error(tcp_err_data)
}

enum tcp_read_start_result {
    tcp_read_start_success(comm::port<tcp_read_result>),
    tcp_read_start_error(tcp_err_data)
}

enum tcp_read_result {
1070
    tcp_read_data(~[u8]),
1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084
    tcp_read_done,
    tcp_read_err(tcp_err_data)
}

iface to_tcp_err_iface {
    fn to_tcp_err() -> tcp_err_data;
}

impl of to_tcp_err_iface for uv::ll::uv_err_data {
    fn to_tcp_err() -> tcp_err_data {
        { err_name: self.err_name, err_msg: self.err_msg }
    }
}

G
Graydon Hoare 已提交
1085
extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
1086 1087
                    nread: libc::ssize_t,
                    ++buf: uv::ll::uv_buf_t) unsafe {
1088 1089
    log(debug, #fmt("entering on_tcp_read_cb stream: %? nread: %?",
                    stream, nread));
1090 1091 1092
    let loop_ptr = uv::ll::get_loop_for_uv_handle(stream);
    let socket_data_ptr = uv::ll::get_data_for_uv_handle(stream)
        as *tcp_socket_data;
1093
    alt nread as int {
1094 1095
      // incoming err.. probably eof
      -1 {
1096 1097 1098 1099 1100
        let err_data = uv::ll::get_last_err_data(loop_ptr).to_tcp_err();
        log(debug, #fmt("on_tcp_read_cb: incoming err.. name %? msg %?",
                        err_data.err_name, err_data.err_msg));
        let reader_ch = (*socket_data_ptr).reader_ch;
        comm::send(reader_ch, result::err(err_data));
1101 1102 1103 1104 1105 1106
      }
      // do nothing .. unneeded buf
      0 {}
      // have data
      _ {
        // we have data
1107
        log(debug, #fmt("tcp on_read_cb nread: %d", nread as int));
1108
        let reader_ch = (*socket_data_ptr).reader_ch;
1109 1110
        let buf_base = uv::ll::get_base_from_buf(buf);
        let buf_len = uv::ll::get_len_from_buf(buf);
1111
        let new_bytes = vec::unsafe::from_buf(buf_base, buf_len as uint);
1112
        comm::send(reader_ch, result::ok(new_bytes));
1113 1114 1115
      }
    }
    uv::ll::free_base_of_buf(buf);
1116
    log(debug, "exiting on_tcp_read_cb");
1117 1118
}

G
Graydon Hoare 已提交
1119
extern fn on_alloc_cb(handle: *libc::c_void,
1120
                     ++suggested_size: size_t)
1121 1122 1123 1124 1125 1126 1127
    -> uv::ll::uv_buf_t unsafe {
    log(debug, "tcp read on_alloc_cb!");
    let char_ptr = uv::ll::malloc_buf_base_of(suggested_size);
    log(debug, #fmt("tcp read on_alloc_cb h: %? char_ptr: %u sugsize: %u",
                     handle,
                     char_ptr as uint,
                     suggested_size as uint));
1128
    uv::ll::buf_init(char_ptr, suggested_size as uint)
1129
}
1130 1131 1132 1133 1134

type tcp_socket_close_data = {
    closed_ch: comm::chan<()>
};

G
Graydon Hoare 已提交
1135
extern fn tcp_socket_dtor_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
1136 1137 1138 1139 1140 1141 1142
    let data = uv::ll::get_data_for_uv_handle(handle)
        as *tcp_socket_close_data;
    let closed_ch = (*data).closed_ch;
    comm::send(closed_ch, ());
    log(debug, "tcp_socket_dtor_close_cb exiting..");
}

G
Graydon Hoare 已提交
1143
extern fn tcp_write_complete_cb(write_req: *uv::ll::uv_write_t,
1144 1145 1146
                              status: libc::c_int) unsafe {
    let write_data_ptr = uv::ll::get_data_for_req(write_req)
        as *write_req_data;
1147
    if status == 0i32 {
1148 1149
        log(debug, "successful write complete");
        comm::send((*write_data_ptr).result_ch, tcp_write_success);
1150
    } else {
1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163
        let stream_handle_ptr = uv::ll::get_stream_handle_from_write_req(
            write_req);
        let loop_ptr = uv::ll::get_loop_for_uv_handle(stream_handle_ptr);
        let err_data = uv::ll::get_last_err_data(loop_ptr);
        log(debug, "failure to write");
        comm::send((*write_data_ptr).result_ch, tcp_write_error(err_data));
    }
}

type write_req_data = {
    result_ch: comm::chan<tcp_write_result>
};

J
Jeff Olson 已提交
1164 1165 1166 1167 1168
type connect_req_data = {
    result_ch: comm::chan<conn_attempt>,
    closed_signal_ch: comm::chan<()>
};

G
Graydon Hoare 已提交
1169
extern fn stream_error_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
J
Jeff Olson 已提交
1170 1171 1172 1173 1174 1175
    let data = uv::ll::get_data_for_uv_handle(handle) as
        *connect_req_data;
    comm::send((*data).closed_signal_ch, ());
    log(debug, #fmt("exiting steam_error_close_cb for %?", handle));
}

G
Graydon Hoare 已提交
1176
extern fn tcp_connect_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
J
Jeff Olson 已提交
1177 1178 1179
    log(debug, #fmt("closed client tcp handle %?", handle));
}

G
Graydon Hoare 已提交
1180
extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t,
J
Jeff Olson 已提交
1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213
                                   status: libc::c_int) unsafe {
    let conn_data_ptr = (uv::ll::get_data_for_req(connect_req_ptr)
                      as *connect_req_data);
    let result_ch = (*conn_data_ptr).result_ch;
    log(debug, #fmt("tcp_connect result_ch %?", result_ch));
    let tcp_stream_ptr =
        uv::ll::get_stream_handle_from_connect_req(connect_req_ptr);
    alt status {
      0i32 {
        log(debug, "successful tcp connection!");
        comm::send(result_ch, conn_success);
      }
      _ {
        log(debug, "error in tcp_connect_on_connect_cb");
        let loop_ptr = uv::ll::get_loop_for_uv_handle(tcp_stream_ptr);
        let err_data = uv::ll::get_last_err_data(loop_ptr);
        log(debug, #fmt("err_data %? %?", err_data.err_name,
                        err_data.err_msg));
        comm::send(result_ch, conn_failure(err_data));
        uv::ll::set_data_for_uv_handle(tcp_stream_ptr,
                                       conn_data_ptr);
        uv::ll::close(tcp_stream_ptr, stream_error_close_cb);
      }
    }
    log(debug, "leaving tcp_connect_on_connect_cb");
}

enum conn_attempt {
    conn_success,
    conn_failure(uv::ll::uv_err_data)
}

type tcp_socket_data = {
1214 1215
    reader_po: comm::port<result::result<~[u8], tcp_err_data>>,
    reader_ch: comm::chan<result::result<~[u8], tcp_err_data>>,
1216
    stream_handle_ptr: *uv::ll::uv_tcp_t,
J
Jeff Olson 已提交
1217
    connect_req: uv::ll::uv_connect_t,
1218
    write_req: uv::ll::uv_write_t,
1219
    iotask: iotask
J
Jeff Olson 已提交
1220 1221
};

1222 1223
type tcp_buffered_socket_data = {
    sock: tcp_socket,
1224
    mut buf: ~[u8]
1225
};
J
Jeff Olson 已提交
1226

1227
//#[cfg(test)]
1228
mod test {
1229
    // FIXME don't run on fbsd or linux 32 bit (#2064)
1230 1231 1232 1233 1234 1235 1236 1237 1238 1239
    #[cfg(target_os="win32")]
    #[cfg(target_os="darwin")]
    #[cfg(target_os="linux")]
    mod tcp_ipv4_server_and_client_test {
        #[cfg(target_arch="x86_64")]
        mod impl64 {
            #[test]
            fn test_gl_tcp_server_and_client_ipv4() unsafe {
                impl_gl_tcp_ipv4_server_and_client();
            }
1240
            #[test]
1241 1242 1243
            fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
                impl_gl_tcp_ipv4_client_error_connection_refused();
            }
1244 1245 1246 1247 1248 1249 1250
            #[test]
            fn test_gl_tcp_server_address_in_use() unsafe {
                impl_gl_tcp_ipv4_server_address_in_use();
            }
            #[test]
            fn test_gl_tcp_server_access_denied() unsafe {
                impl_gl_tcp_ipv4_server_access_denied();
1251
            }
1252 1253 1254 1255
            #[test]
            fn test_gl_tcp_ipv4_server_client_reader_writer() {
                impl_gl_tcp_ipv4_server_client_reader_writer();
            }
1256

1257 1258 1259 1260 1261 1262 1263 1264
        }
        #[cfg(target_arch="x86")]
        mod impl32 {
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_server_and_client_ipv4() unsafe {
                impl_gl_tcp_ipv4_server_and_client();
            }
1265 1266
            #[test]
            #[ignore(cfg(target_os = "linux"))]
1267 1268 1269
            fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
                impl_gl_tcp_ipv4_client_error_connection_refused();
            }
1270 1271 1272 1273 1274 1275 1276
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_server_address_in_use() unsafe {
                impl_gl_tcp_ipv4_server_address_in_use();
            }
            #[test]
            #[ignore(cfg(target_os = "linux"))]
1277
            #[ignore(cfg(windows), reason = "deadlocking bots")]
1278 1279 1280
            fn test_gl_tcp_server_access_denied() unsafe {
                impl_gl_tcp_ipv4_server_access_denied();
            }
1281 1282 1283 1284
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_ipv4_server_client_reader_writer() {
                impl_gl_tcp_ipv4_server_client_reader_writer();
1285
            }
1286
        }
1287
    }
1288
    fn impl_gl_tcp_ipv4_server_and_client() {
1289
        let hl_loop = uv::global_loop::get();
1290 1291
        let server_ip = "127.0.0.1";
        let server_port = 8888u;
1292 1293 1294 1295 1296
        let expected_req = "ping";
        let expected_resp = "pong";

        let server_result_po = comm::port::<str>();
        let server_result_ch = comm::chan(server_result_po);
1297 1298 1299

        let cont_po = comm::port::<()>();
        let cont_ch = comm::chan(cont_po);
1300
        // server
1301
        do task::spawn_sched(task::manual_threads(1u)) {
B
Brian Anderson 已提交
1302
            let actual_req = do comm::listen |server_ch| {
1303 1304 1305 1306
                run_tcp_test_server(
                    server_ip,
                    server_port,
                    expected_resp,
1307
                    server_ch,
1308 1309
                    cont_ch,
                    hl_loop)
1310 1311 1312
            };
            server_result_ch.send(actual_req);
        };
1313
        comm::recv(cont_po);
1314 1315
        // client
        log(debug, "server started, firing up client..");
1316
        let actual_resp_result = do comm::listen |client_ch| {
1317 1318 1319 1320
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
1321 1322
                client_ch,
                hl_loop)
1323
        };
1324
        assert actual_resp_result.is_ok();
1325
        let actual_resp = actual_resp_result.get();
1326 1327 1328 1329 1330 1331 1332
        let actual_req = comm::recv(server_result_po);
        log(debug, #fmt("REQ: expected: '%s' actual: '%s'",
                       expected_req, actual_req));
        log(debug, #fmt("RESP: expected: '%s' actual: '%s'",
                       expected_resp, actual_resp));
        assert str::contains(actual_req, expected_req);
        assert str::contains(actual_resp, expected_resp);
1333
    }
1334
    fn impl_gl_tcp_ipv4_client_error_connection_refused() {
1335
        let hl_loop = uv::global_loop::get();
1336 1337 1338
        let server_ip = "127.0.0.1";
        let server_port = 8889u;
        let expected_req = "ping";
1339 1340
        // client
        log(debug, "firing up client..");
1341
        let actual_resp_result = do comm::listen |client_ch| {
1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
                client_ch,
                hl_loop)
        };
        alt actual_resp_result.get_err() {
          connection_refused {
          }
          _ {
            fail "unknown error.. expected connection_refused"
          }
        }
    }
1357 1358 1359 1360 1361
    fn impl_gl_tcp_ipv4_server_address_in_use() {
        let hl_loop = uv::global_loop::get();
        let server_ip = "127.0.0.1";
        let server_port = 8890u;
        let expected_req = "ping";
1362 1363 1364 1365 1366 1367 1368 1369
        let expected_resp = "pong";

        let server_result_po = comm::port::<str>();
        let server_result_ch = comm::chan(server_result_po);

        let cont_po = comm::port::<()>();
        let cont_ch = comm::chan(cont_po);
        // server
1370
        do task::spawn_sched(task::manual_threads(1u)) {
B
Brian Anderson 已提交
1371
            let actual_req = do comm::listen |server_ch| {
1372
                run_tcp_test_server(
1373 1374 1375 1376
                    server_ip,
                    server_port,
                    expected_resp,
                    server_ch,
1377 1378
                    cont_ch,
                    hl_loop)
1379 1380 1381 1382
            };
            server_result_ch.send(actual_req);
        };
        comm::recv(cont_po);
1383 1384 1385 1386 1387 1388
        // this one should fail..
        let listen_err = run_tcp_test_server_fail(
                            server_ip,
                            server_port,
                            hl_loop);
        // client.. just doing this so that the first server tears down
1389
        log(debug, "server started, firing up client..");
1390
        do comm::listen |client_ch| {
1391 1392 1393 1394
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
1395 1396
                client_ch,
                hl_loop)
1397
        };
1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426
        alt listen_err {
          address_in_use {
            assert true;
          }
          _ {
            fail "expected address_in_use listen error,"+
                      "but got a different error varient. check logs.";
          }
        }
    }
    fn impl_gl_tcp_ipv4_server_access_denied() {
        let hl_loop = uv::global_loop::get();
        let server_ip = "127.0.0.1";
        let server_port = 80u;
        // this one should fail..
        let listen_err = run_tcp_test_server_fail(
                            server_ip,
                            server_port,
                            hl_loop);
        alt listen_err {
          access_denied {
            assert true;
          }
          _ {
            fail "expected address_in_use listen error,"+
                      "but got a different error varient. check logs.";
          }
        }
    }
1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439
    fn impl_gl_tcp_ipv4_server_client_reader_writer() {
        let iotask = uv::global_loop::get();
        let server_ip = "127.0.0.1";
        let server_port = 8891u;
        let expected_req = "ping";
        let expected_resp = "pong";

        let server_result_po = comm::port::<str>();
        let server_result_ch = comm::chan(server_result_po);

        let cont_po = comm::port::<()>();
        let cont_ch = comm::chan(cont_po);
        // server
1440
        do task::spawn_sched(task::manual_threads(1u)) {
1441
            let actual_req = do comm::listen |server_ch| {
1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455
                run_tcp_test_server(
                    server_ip,
                    server_port,
                    expected_resp,
                    server_ch,
                    cont_ch,
                    iotask)
            };
            server_result_ch.send(actual_req);
        };
        comm::recv(cont_po);
        // client
        let server_addr = ip::v4::parse_addr(server_ip);
        let conn_result = connect(server_addr, server_port, iotask);
1456
        if result::is_err(conn_result) {
1457 1458 1459 1460 1461 1462
            assert false;
        }
        let sock_buf = @socket_buf(result::unwrap(conn_result));
        buf_write(sock_buf as io::writer, expected_req);

        // so contrived!
1463
        let actual_resp = do str::as_bytes(expected_resp) |resp_buf| {
1464 1465 1466
            buf_read(sock_buf as io::reader,
                     vec::len(resp_buf))
        };
1467

1468 1469 1470 1471 1472 1473 1474 1475
        let actual_req = comm::recv(server_result_po);
        log(debug, #fmt("REQ: expected: '%s' actual: '%s'",
                       expected_req, actual_req));
        log(debug, #fmt("RESP: expected: '%s' actual: '%s'",
                       expected_resp, actual_resp));
        assert str::contains(actual_req, expected_req);
        assert str::contains(actual_resp, expected_resp);
    }
1476

1477 1478
    fn buf_write(+w: io::writer, val: str) {
        log(debug, #fmt("BUF_WRITE: val len %?", str::len(val)));
1479
        do str::byte_slice(val) |b_slice| {
1480 1481 1482
            log(debug, #fmt("BUF_WRITE: b_slice len %?",
                            vec::len(b_slice)));
            w.write(b_slice)
1483 1484 1485 1486 1487 1488 1489 1490 1491
        }
    }

    fn buf_read(+r: io::reader, len: uint) -> str {
        let new_bytes = r.read_bytes(len);
        log(debug, #fmt("in buf_read.. new_bytes len: %?",
                        vec::len(new_bytes)));
        str::from_bytes(new_bytes)
    }
1492

1493
    fn run_tcp_test_server(server_ip: str, server_port: uint, resp: str,
1494
                          server_ch: comm::chan<str>,
1495
                          cont_ch: comm::chan<()>,
1496
                          iotask: iotask) -> str {
1497 1498 1499
        let server_ip_addr = ip::v4::parse_addr(server_ip);
        let listen_result = listen(server_ip_addr, server_port, 128u, iotask,
            // on_establish_cb -- called when listener is set up
1500
            |kill_ch| {
1501 1502 1503 1504 1505 1506
                log(debug, #fmt("establish_cb %?",
                    kill_ch));
                comm::send(cont_ch, ());
            },
            // risky to run this on the loop, but some users
            // will want the POWER
1507
            |new_conn, kill_ch| {
1508
            log(debug, "SERVER: new connection!");
1509
            do comm::listen |cont_ch| {
1510
                do task::spawn_sched(task::manual_threads(1u)) {
1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532
                    log(debug, "SERVER: starting worker for new req");

                    let accept_result = accept(new_conn);
                    log(debug, "SERVER: after accept()");
                    if result::is_err(accept_result) {
                        log(debug, "SERVER: error accept connection");
                        let err_data = result::get_err(accept_result);
                        comm::send(kill_ch, some(err_data));
                        log(debug,
                            "SERVER/WORKER: send on err cont ch");
                        cont_ch.send(());
                    }
                    else {
                        log(debug,
                            "SERVER/WORKER: send on cont ch");
                        cont_ch.send(());
                        let sock = result::unwrap(accept_result);
                        log(debug, "SERVER: successfully accepted"+
                            "connection!");
                        let received_req_bytes = read(sock, 0u);
                        alt received_req_bytes {
                          result::ok(data) {
1533 1534 1535
                            log(debug, "SERVER: got REQ str::from_bytes..");
                            log(debug, #fmt("SERVER: REQ data len: %?",
                                            vec::len(data)));
1536 1537 1538 1539 1540 1541 1542 1543 1544 1545
                            server_ch.send(
                                str::from_bytes(data));
                            log(debug, "SERVER: before write");
                            tcp_write_single(sock, str::bytes(resp));
                            log(debug, "SERVER: after write.. die");
                            comm::send(kill_ch, none);
                          }
                          result::err(err_data) {
                            log(debug, #fmt("SERVER: error recvd: %s %s",
                                err_data.err_name, err_data.err_msg));
1546
                            comm::send(kill_ch, some(err_data));
1547 1548
                            server_ch.send("");
                          }
1549
                        }
1550
                        log(debug, "SERVER: worker spinning down");
1551
                    }
1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563
                }
                log(debug, "SERVER: waiting to recv on cont_ch");
                cont_ch.recv()
            };
            log(debug, "SERVER: recv'd on cont_ch..leaving listen cb");
        });
        // err check on listen_result
        if result::is_err(listen_result) {
            alt result::get_err(listen_result) {
              generic_listen_err(name, msg) {
                fail #fmt("SERVER: exited abnormally name %s msg %s",
                                name, msg);
1564
              }
1565 1566 1567 1568 1569
              access_denied {
                fail "SERVER: exited abnormally, got access denied..";
              }
              address_in_use {
                fail "SERVER: exited abnormally, got address in use...";
1570
              }
1571
            }
1572
        }
1573 1574 1575 1576 1577
        let ret_val = server_ch.recv();
        log(debug, #fmt("SERVER: exited and got ret val: '%s'", ret_val));
        ret_val
    }

1578 1579 1580 1581 1582
    fn run_tcp_test_server_fail(server_ip: str, server_port: uint,
                          iotask: iotask) -> tcp_listen_err_data {
        let server_ip_addr = ip::v4::parse_addr(server_ip);
        let listen_result = listen(server_ip_addr, server_port, 128u, iotask,
            // on_establish_cb -- called when listener is set up
1583
            |kill_ch| {
1584 1585 1586
                log(debug, #fmt("establish_cb %?",
                    kill_ch));
            },
1587
            |new_conn, kill_ch| {
1588 1589 1590 1591
                fail #fmt("SERVER: shouldn't be called.. %? %?",
                           new_conn, kill_ch);
        });
        // err check on listen_result
1592
        if result::is_err(listen_result) {
1593 1594 1595 1596 1597 1598 1599
            result::get_err(listen_result)
        }
        else {
            fail "SERVER: did not fail as expected"
        }
    }

1600
    fn run_tcp_test_client(server_ip: str, server_port: uint, resp: str,
1601
                          client_ch: comm::chan<str>,
1602 1603
                          iotask: iotask) -> result::result<str,
                                                    tcp_connect_err_data> {
1604
        let server_ip_addr = ip::v4::parse_addr(server_ip);
1605

1606
        log(debug, "CLIENT: starting..");
1607
        let connect_result = connect(server_ip_addr, server_port, iotask);
1608
        if result::is_err(connect_result) {
1609
            log(debug, "CLIENT: failed to connect");
1610
            let err_data = result::get_err(connect_result);
1611
            err(err_data)
1612
        }
1613 1614 1615 1616
        else {
            let sock = result::unwrap(connect_result);
            let resp_bytes = str::bytes(resp);
            tcp_write_single(sock, resp_bytes);
1617
            let read_result = sock.read(0u);
1618
            if read_result.is_err() {
1619
                log(debug, "CLIENT: failure to read");
1620
                ok("")
1621
            }
1622 1623 1624 1625 1626
            else {
                client_ch.send(str::from_bytes(read_result.get()));
                let ret_val = client_ch.recv();
                log(debug, #fmt("CLIENT: after client_ch recv ret: '%s'",
                   ret_val));
1627
                ok(ret_val)
1628 1629
            }
        }
1630 1631
    }

1632
    fn tcp_write_single(sock: tcp_socket, val: ~[u8]) {
1633 1634
        let write_result_future = sock.write_future(val);
        let write_result = write_result_future.get();
1635
        if result::is_err(write_result) {
1636
            log(debug, "tcp_write_single: write failed!");
1637
            let err_data = result::get_err(write_result);
1638 1639 1640 1641
            log(debug, #fmt("tcp_write_single err name: %s msg: %s",
                err_data.err_name, err_data.err_msg));
            // meh. torn on what to do here.
            fail "tcp_write_single failed";
1642
        }
1643
    }
1644
}