net_tcp.rs 59.1 KB
Newer Older
1
//! High-level interface to libuv's TCP functionality
2

J
Jeff Olson 已提交
3
import ip = net_ip;
4 5
import uv::iotask;
import uv::iotask::iotask;
6
import comm::methods;
7
import future_spawn = future::spawn;
8
import future::extensions;
J
Jeff Olson 已提交
9 10
// FIXME #1935
// should be able to, but can't atm, replace w/ result::{result, extensions};
11 12 13
import result::*;
import libc::size_t;
import str::extensions;
14
import io::{reader, reader_util, writer};
J
Jeff Olson 已提交
15

16
// tcp interfaces
17
export tcp_socket;
18 19
// buffered socket
export tcp_socket_buf, socket_buf;
20 21
// errors
export tcp_err_data, tcp_connect_err_data;
22
// operations on a tcp_socket
23
export write, write_future, read_start, read_stop;
24
// tcp server stuff
25
export listen, accept;
26 27
// tcp client stuff
export connect;
J
Jeff Olson 已提交
28

29
#[nolink]
30
extern mod rustrt {
31
    fn rust_uv_current_kernel_malloc(size: libc::c_uint) -> *libc::c_void;
32
    fn rust_uv_current_kernel_free(mem: *libc::c_void);
33
    fn rust_uv_helper_uv_tcp_t_size() -> libc::c_uint;
34 35
}

36 37 38 39 40 41 42
/**
 * Encapsulates an open TCP/IP connection through libuv
 *
 * `tcp_socket` is non-copyable/sendable and automagically handles closing the
 * underlying libuv data structures when it goes out of scope. This is the
 * data structure that is used for read/write operations over a TCP stream.
 */
43 44 45 46
class tcp_socket {
  let socket_data: @tcp_socket_data;
  new(socket_data: @tcp_socket_data) { self.socket_data = socket_data; }
  drop {
47
    unsafe {
48
        tear_down_socket_data(self.socket_data)
49 50
    }
  }
J
Jeff Olson 已提交
51 52
}

53 54 55 56 57 58
/**
 * A buffered wrapper for `net::tcp::tcp_socket`
 *
 * It is created with a call to `net::tcp::socket_buf()` and has impls that
 * satisfy both the `io::reader` and `io::writer` ifaces.
 */
59 60 61
class tcp_socket_buf {
  let data: @tcp_buffered_socket_data;
  new(data: @tcp_buffered_socket_data) { self.data = data; }
62 63
}

64
/// Contains raw, string-based, error information returned from libuv
65
type tcp_err_data = {
66 67
    err_name: ~str,
    err_msg: ~str
68
};
69
/// Details returned as part of a `result::err` result from `tcp::listen`
70
enum tcp_listen_err_data {
71 72 73 74
    /**
     * Some unplanned-for error. The first and second fields correspond
     * to libuv's `err_name` and `err_msg` fields, respectively.
     */
75
    generic_listen_err(~str, ~str),
76 77 78 79 80 81 82
    /**
     * Failed to bind to the requested IP/Port, because it is already in use.
     *
     * # Possible Causes
     *
     * * Attempting to bind to a port already bound to another listener
     */
83
    address_in_use,
84 85 86 87 88 89 90 91 92 93
    /**
     * Request to bind to an IP/Port was denied by the system.
     *
     * # Possible Causes
     *
     * * Attemping to binding to an IP/Port as a non-Administrator
     *   on Windows Vista+
     * * Attempting to bind, as a non-priv'd
     *   user, to 'privileged' ports (< 1024) on *nix
     */
94 95
    access_denied
}
96
/// Details returned as part of a `result::err` result from `tcp::connect`
97
enum tcp_connect_err_data {
98 99 100 101
    /**
     * Some unplanned-for error. The first and second fields correspond
     * to libuv's `err_name` and `err_msg` fields, respectively.
     */
102
    generic_connect_err(~str, ~str),
103
    /// Invalid IP or invalid port
104 105
    connection_refused
}
106

107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
/**
 * Initiate a client connection over TCP/IP
 *
 * # Arguments
 *
 * * `input_ip` - The IP address (versions 4 or 6) of the remote host
 * * `port` - the unsigned integer of the desired remote host port
 * * `iotask` - a `uv::iotask` that the tcp request will run on
 *
 * # Returns
 *
 * A `result` that, if the operation succeeds, contains a
 * `net::net::tcp_socket` that can be used to send and receive data to/from
 * the remote host. In the event of failure, a
 * `net::tcp::tcp_connect_err_data` instance will be returned
 */
123
fn connect(-input_ip: ip::ip_addr, port: uint,
124
           iotask: iotask)
125
    -> result::result<tcp_socket, tcp_connect_err_data> unsafe {
J
Jeff Olson 已提交
126 127 128 129 130 131 132
    let result_po = comm::port::<conn_attempt>();
    let closed_signal_po = comm::port::<()>();
    let conn_data = {
        result_ch: comm::chan(result_po),
        closed_signal_ch: comm::chan(closed_signal_po)
    };
    let conn_data_ptr = ptr::addr_of(conn_data);
133
    let reader_po = comm::port::<result::result<~[u8], tcp_err_data>>();
134 135 136
    let stream_handle_ptr = malloc_uv_tcp_t();
    *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
    let socket_data = @{
137 138
        reader_po: reader_po,
        reader_ch: comm::chan(reader_po),
139 140 141
        stream_handle_ptr: stream_handle_ptr,
        connect_req: uv::ll::connect_t(),
        write_req: uv::ll::write_t(),
142
        iotask: iotask
J
Jeff Olson 已提交
143
    };
144
    let socket_data_ptr = ptr::addr_of(*socket_data);
145
    log(debug, fmt!{"tcp_connect result_ch %?", conn_data.result_ch});
J
Jeff Olson 已提交
146 147
    // get an unsafe representation of our stream_handle_ptr that
    // we can send into the interact cb to be handled in libuv..
148 149
    log(debug, fmt!{"stream_handle_ptr outside interact %?",
        stream_handle_ptr});
B
Brian Anderson 已提交
150
    do iotask::interact(iotask) |loop_ptr| {
151
        log(debug, ~"in interact cb for tcp client connect..");
152 153
        log(debug, fmt!{"stream_handle_ptr in interact %?",
            stream_handle_ptr});
J
Jeff Olson 已提交
154 155
        alt uv::ll::tcp_init( loop_ptr, stream_handle_ptr) {
          0i32 {
156
            log(debug, ~"tcp_init successful");
J
Jeff Olson 已提交
157 158
            alt input_ip {
              ipv4 {
159
                log(debug, ~"dealing w/ ipv4 connection..");
J
Jeff Olson 已提交
160 161
                let connect_req_ptr =
                    ptr::addr_of((*socket_data_ptr).connect_req);
162 163 164 165 166 167 168 169
                let addr_str = ip::format_addr(input_ip);
                let connect_result = alt input_ip {
                  ip::ipv4(addr) {
                    // have to "recreate" the sockaddr_in/6
                    // since the ip_addr discards the port
                    // info.. should probably add an additional
                    // rust type that actually is closer to
                    // what the libuv API expects (ip str + port num)
170
                    log(debug, fmt!{"addr: %?", addr});
171 172 173 174 175 176 177 178
                    let in_addr = uv::ll::ip4_addr(addr_str, port as int);
                    uv::ll::tcp_connect(
                        connect_req_ptr,
                        stream_handle_ptr,
                        ptr::addr_of(in_addr),
                        tcp_connect_on_connect_cb)
                  }
                  ip::ipv6(addr) {
179
                    log(debug, fmt!{"addr: %?", addr});
180 181 182 183 184 185 186 187 188
                    let in_addr = uv::ll::ip6_addr(addr_str, port as int);
                    uv::ll::tcp_connect6(
                        connect_req_ptr,
                        stream_handle_ptr,
                        ptr::addr_of(in_addr),
                        tcp_connect_on_connect_cb)
                  }
                };
                alt connect_result {
J
Jeff Olson 已提交
189
                  0i32 {
190
                    log(debug, ~"tcp_connect successful");
J
Jeff Olson 已提交
191 192 193
                    // reusable data that we'll have for the
                    // duration..
                    uv::ll::set_data_for_uv_handle(stream_handle_ptr,
194 195
                                               socket_data_ptr as
                                                  *libc::c_void);
J
Jeff Olson 已提交
196 197 198 199
                    // just so the connect_cb can send the
                    // outcome..
                    uv::ll::set_data_for_req(connect_req_ptr,
                                             conn_data_ptr);
200
                    log(debug, ~"leaving tcp_connect interact cb...");
J
Jeff Olson 已提交
201 202 203 204 205 206 207 208
                    // let tcp_connect_on_connect_cb send on
                    // the result_ch, now..
                  }
                  _ {
                    // immediate connect failure.. probably a garbage
                    // ip or somesuch
                    let err_data = uv::ll::get_last_err_data(loop_ptr);
                    comm::send((*conn_data_ptr).result_ch,
209
                               conn_failure(err_data.to_tcp_err()));
J
Jeff Olson 已提交
210 211 212 213 214 215 216 217 218 219 220 221
                    uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                                   conn_data_ptr);
                    uv::ll::close(stream_handle_ptr, stream_error_close_cb);
                  }
                }
              }
            }
        }
          _ {
            // failure to create a tcp handle
            let err_data = uv::ll::get_last_err_data(loop_ptr);
            comm::send((*conn_data_ptr).result_ch,
222
                       conn_failure(err_data.to_tcp_err()));
J
Jeff Olson 已提交
223 224 225 226 227
          }
        }
    };
    alt comm::recv(result_po) {
      conn_success {
228
        log(debug, ~"tcp::connect - received success on result_po");
229
        result::ok(tcp_socket(socket_data))
J
Jeff Olson 已提交
230 231 232
      }
      conn_failure(err_data) {
        comm::recv(closed_signal_po);
233
        log(debug, ~"tcp::connect - received failure on result_po");
234 235 236 237
        // still have to free the malloc'd stream handle..
        rustrt::rust_uv_current_kernel_free(stream_handle_ptr
                                           as *libc::c_void);
        let tcp_conn_err = alt err_data.err_name {
238
          ~"ECONNREFUSED" { connection_refused }
239 240 241
          _ { generic_connect_err(err_data.err_name, err_data.err_msg) }
        };
        result::err(tcp_conn_err)
J
Jeff Olson 已提交
242 243 244
      }
    }
}
245

246 247 248 249 250 251
/**
 * Write binary data to a tcp stream; Blocks until operation completes
 *
 * # Arguments
 *
 * * sock - a `tcp_socket` to write to
252
 * * raw_write_data - a vector of `~[u8]` that will be written to the stream.
253 254 255 256 257 258 259
 * This value must remain valid for the duration of the `write` call
 *
 * # Returns
 *
 * A `result` object with a `nil` value as the `ok` variant, or a
 * `tcp_err_data` value as the `err` variant
 */
260
fn write(sock: tcp_socket, raw_write_data: ~[u8])
261
    -> result::result<(), tcp_err_data> unsafe {
262
    let socket_data_ptr = ptr::addr_of(*(sock.socket_data));
263 264 265
    write_common_impl(socket_data_ptr, raw_write_data)
}

266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
/**
 * Write binary data to tcp stream; Returns a `future::future` value
 * immediately
 *
 * # Safety
 *
 * This function can produce unsafe results if:
 *
 * 1. the call to `write_future` is made
 * 2. the `future::future` value returned is never resolved via
 * `future::get`
 * 3. and then the `tcp_socket` passed in to `write_future` leaves
 * scope and is destructed before the task that runs the libuv write
 * operation completes.
 *
 * As such: If using `write_future`, always be sure to resolve the returned
 * `future` so as to ensure libuv doesn't try to access a released write
 * handle. Otherwise, use the blocking `tcp::write` function instead.
 *
 * # Arguments
 *
 * * sock - a `tcp_socket` to write to
288
 * * raw_write_data - a vector of `~[u8]` that will be written to the stream.
289 290 291 292 293 294 295 296
 * This value must remain valid for the duration of the `write` call
 *
 * # Returns
 *
 * A `future` value that, once the `write` operation completes, resolves to a
 * `result` object with a `nil` value as the `ok` variant, or a `tcp_err_data`
 * value as the `err` variant
 */
297
fn write_future(sock: tcp_socket, raw_write_data: ~[u8])
298
    -> future::future<result::result<(), tcp_err_data>> unsafe {
299
    let socket_data_ptr = ptr::addr_of(*(sock.socket_data));
300
    do future_spawn {
301
        let data_copy = copy(raw_write_data);
302
        write_common_impl(socket_data_ptr, data_copy)
303
    }
304 305
}

306 307 308 309 310 311 312 313 314 315 316 317 318 319
/**
 * Begin reading binary data from an open TCP connection; used with
 * `read_stop`
 *
 * # Arguments
 *
 * * sock -- a `net::tcp::tcp_socket` for the connection to read from
 *
 * # Returns
 *
 * * A `result` instance that will either contain a
 * `comm::port<tcp_read_result>` that the user can read (and optionally, loop
 * on) from until `read_stop` is called, or a `tcp_err_data` record
 */
320 321
fn read_start(sock: tcp_socket)
    -> result::result<comm::port<
322
        result::result<~[u8], tcp_err_data>>, tcp_err_data> unsafe {
323
    let socket_data = ptr::addr_of(*(sock.socket_data));
324
    read_start_common_impl(socket_data)
325
}
326

327 328 329 330 331 332 333
/**
 * Stop reading from an open TCP connection; used with `read_start`
 *
 * # Arguments
 *
 * * `sock` - a `net::tcp::tcp_socket` that you wish to stop reading on
 */
334
fn read_stop(sock: tcp_socket,
335
             -read_port: comm::port<result::result<~[u8], tcp_err_data>>) ->
336
    result::result<(), tcp_err_data> unsafe {
337
    log(debug, fmt!{"taking the read_port out of commission %?", read_port});
338
    let socket_data = ptr::addr_of(*sock.socket_data);
339 340 341
    read_stop_common_impl(socket_data)
}

342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
/**
 * Reads a single chunk of data from `tcp_socket`; block until data/error
 * recv'd
 *
 * Does a blocking read operation for a single chunk of data from a
 * `tcp_socket` until a data arrives or an error is received. The provided
 * `timeout_msecs` value is used to raise an error if the timeout period
 * passes without any data received.
 *
 * # Arguments
 *
 * * `sock` - a `net::tcp::tcp_socket` that you wish to read from
 * * `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
 * read attempt. Pass `0u` to wait indefinitely
 */
357
fn read(sock: tcp_socket, timeout_msecs: uint)
358
    -> result::result<~[u8],tcp_err_data> {
359
    let socket_data = ptr::addr_of(*(sock.socket_data));
360 361 362
    read_common_impl(socket_data, timeout_msecs)
}

363
/**
364
 * Reads a single chunk of data; returns a `future::future<~[u8]>`
365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391
 * immediately
 *
 * Does a non-blocking read operation for a single chunk of data from a
 * `tcp_socket` and immediately returns a `future` value representing the
 * result. When resolving the returned `future`, it will block until data
 * arrives or an error is received. The provided `timeout_msecs`
 * value is used to raise an error if the timeout period passes without any
 * data received.
 *
 * # Safety
 *
 * This function can produce unsafe results if the call to `read_future` is
 * made, the `future::future` value returned is never resolved via
 * `future::get`, and then the `tcp_socket` passed in to `read_future` leaves
 * scope and is destructed before the task that runs the libuv read
 * operation completes.
 *
 * As such: If using `read_future`, always be sure to resolve the returned
 * `future` so as to ensure libuv doesn't try to access a released read
 * handle. Otherwise, use the blocking `tcp::read` function instead.
 *
 * # Arguments
 *
 * * `sock` - a `net::tcp::tcp_socket` that you wish to read from
 * * `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
 * read attempt. Pass `0u` to wait indefinitely
 */
392
fn read_future(sock: tcp_socket, timeout_msecs: uint)
393
    -> future::future<result::result<~[u8],tcp_err_data>> {
394
    let socket_data = ptr::addr_of(*(sock.socket_data));
395
    do future_spawn {
396
        read_common_impl(socket_data, timeout_msecs)
397
    }
398
}
399

400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468
/**
 * Bind an incoming client connection to a `net::tcp::tcp_socket`
 *
 * # Notes
 *
 * It is safe to call `net::tcp::accept` _only_ within the context of the
 * `new_connect_cb` callback provided as the final argument to the
 * `net::tcp::listen` function.
 *
 * The `new_conn` opaque value is provided _only_ as the first argument to the
 * `new_connect_cb` provided as a part of `net::tcp::listen`.
 * It can be safely sent to another task but it _must_ be
 * used (via `net::tcp::accept`) before the `new_connect_cb` call it was
 * provided to returns.
 *
 * This implies that a port/chan pair must be used to make sure that the
 * `new_connect_cb` call blocks until an attempt to create a
 * `net::tcp::tcp_socket` is completed.
 *
 * # Example
 *
 * Here, the `new_conn` is used in conjunction with `accept` from within
 * a task spawned by the `new_connect_cb` passed into `listen`
 *
 * ~~~~~~~~~~~
 * net::tcp::listen(remote_ip, remote_port, backlog)
 *     // this callback is ran once after the connection is successfully
 *     // set up
 *     {|kill_ch|
 *       // pass the kill_ch to your main loop or wherever you want
 *       // to be able to externally kill the server from
 *     }
 *     // this callback is ran when a new connection arrives
 *     {|new_conn, kill_ch|
 *     let cont_po = comm::port::<option<tcp_err_data>>();
 *     let cont_ch = comm::chan(cont_po);
 *     task::spawn {||
 *         let accept_result = net::tcp::accept(new_conn);
 *         if accept_result.is_err() {
 *             comm::send(cont_ch, result::get_err(accept_result));
 *             // fail?
 *         }
 *         else {
 *             let sock = result::get(accept_result);
 *             comm::send(cont_ch, true);
 *             // do work here
 *         }
 *     };
 *     alt comm::recv(cont_po) {
 *       // shut down listen()
 *       some(err_data) { comm::send(kill_chan, some(err_data)) }
 *       // wait for next connection
 *       none {}
 *     }
 * };
 * ~~~~~~~~~~~
 *
 * # Arguments
 *
 * * `new_conn` - an opaque value used to create a new `tcp_socket`
 *
 * # Returns
 *
 * On success, this function will return a `net::tcp::tcp_socket` as the
 * `ok` variant of a `result`. The `net::tcp::tcp_socket` is anchored within
 * the task that `accept` was called within for its lifetime. On failure,
 * this function will return a `net::tcp::tcp_err_data` record
 * as the `err` variant of a `result`.
 */
469 470 471 472 473 474
fn accept(new_conn: tcp_new_connection)
    -> result::result<tcp_socket, tcp_err_data> unsafe {

    alt new_conn{
      new_tcp_conn(server_handle_ptr) {
        let server_data_ptr = uv::ll::get_data_for_uv_handle(
475
            server_handle_ptr) as *tcp_listen_fc_data;
476
        let reader_po = comm::port::<result::result<~[u8], tcp_err_data>>();
477
        let iotask = (*server_data_ptr).iotask;
478 479 480
        let stream_handle_ptr = malloc_uv_tcp_t();
        *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
        let client_socket_data = @{
481 482
            reader_po: reader_po,
            reader_ch: comm::chan(reader_po),
483
            stream_handle_ptr : stream_handle_ptr,
484 485
            connect_req : uv::ll::connect_t(),
            write_req : uv::ll::write_t(),
486
            iotask : iotask
487
        };
488 489 490
        let client_socket_data_ptr = ptr::addr_of(*client_socket_data);
        let client_stream_handle_ptr =
            (*client_socket_data_ptr).stream_handle_ptr;
491 492 493

        let result_po = comm::port::<option<tcp_err_data>>();
        let result_ch = comm::chan(result_po);
494 495 496 497 498 499 500

        // UNSAFE LIBUV INTERACTION BEGIN
        // .. normally this happens within the context of
        // a call to uv::hl::interact.. but we're breaking
        // the rules here because this always has to be
        // called within the context of a listen() new_connect_cb
        // callback (or it will likely fail and drown your cat)
501
        log(debug, ~"in interact cb for tcp::accept");
502 503 504 505
        let loop_ptr = uv::ll::get_loop_for_uv_handle(
            server_handle_ptr);
        alt uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
          0i32 {
506
            log(debug, ~"uv_tcp_init successful for client stream");
507 508 509
            alt uv::ll::accept(
                server_handle_ptr as *libc::c_void,
                client_stream_handle_ptr as *libc::c_void) {
510
              0i32 {
511
                log(debug, ~"successfully accepted client connection");
512
                uv::ll::set_data_for_uv_handle(client_stream_handle_ptr,
513 514
                                               client_socket_data_ptr
                                                   as *libc::c_void);
515
                comm::send(result_ch, none);
516 517
              }
              _ {
518
                log(debug, ~"failed to accept client conn");
519 520 521 522
                comm::send(result_ch, some(
                    uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
              }
            }
523 524
          }
          _ {
525
            log(debug, ~"failed to init client stream");
526 527 528 529 530
            comm::send(result_ch, some(
                uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
          }
        }
        // UNSAFE LIBUV INTERACTION END
531 532 533 534 535
        alt comm::recv(result_po) {
          some(err_data) {
            result::err(err_data)
          }
          none {
536
            result::ok(tcp_socket(client_socket_data))
537 538 539 540 541 542
          }
        }
      }
    }
}

543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570
/**
 * Bind to a given IP/port and listen for new connections
 *
 * # Arguments
 *
 * * `host_ip` - a `net::ip::ip_addr` representing a unique IP
 * (versions 4 or 6)
 * * `port` - a uint representing the port to listen on
 * * `backlog` - a uint representing the number of incoming connections
 * to cache in memory
 * * `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on
 * * `on_establish_cb` - a callback that is evaluated if/when the listener
 * is successfully established. it takes no parameters
 * * `new_connect_cb` - a callback to be evaluated, on the libuv thread,
 * whenever a client attempts to conect on the provided ip/port. the
 * callback's arguments are:
 *     * `new_conn` - an opaque type that can be passed to
 *     `net::tcp::accept` in order to be converted to a `tcp_socket`.
 *     * `kill_ch` - channel of type `comm::chan<option<tcp_err_data>>`. this
 *     channel can be used to send a message to cause `listen` to begin
 *     closing the underlying libuv data structures.
 *
 * # returns
 *
 * a `result` instance containing empty data of type `()` on a
 * successful/normal shutdown, and a `tcp_listen_err_data` enum in the event
 * of listen exiting because of an error
 */
571
fn listen(-host_ip: ip::ip_addr, port: uint, backlog: uint,
572
          iotask: iotask,
573
          on_establish_cb: fn~(comm::chan<option<tcp_err_data>>),
574 575
          +new_connect_cb: fn~(tcp_new_connection,
                               comm::chan<option<tcp_err_data>>))
576
    -> result::result<(), tcp_listen_err_data> unsafe {
577
    do listen_common(host_ip, port, backlog, iotask, on_establish_cb)
578
        // on_connect_cb
579
        |handle| {
580 581 582 583 584 585 586 587
            let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
                as *tcp_listen_fc_data;
            let new_conn = new_tcp_conn(handle);
            let kill_ch = (*server_data_ptr).kill_ch;
            new_connect_cb(new_conn, kill_ch);
    }
}

588
fn listen_common(-host_ip: ip::ip_addr, port: uint, backlog: uint,
589 590 591
          iotask: iotask,
          on_establish_cb: fn~(comm::chan<option<tcp_err_data>>),
          -on_connect_cb: fn~(*uv::ll::uv_tcp_t))
592
    -> result::result<(), tcp_listen_err_data> unsafe {
593 594 595 596 597 598 599 600 601
    let stream_closed_po = comm::port::<()>();
    let kill_po = comm::port::<option<tcp_err_data>>();
    let kill_ch = comm::chan(kill_po);
    let server_stream = uv::ll::tcp_t();
    let server_stream_ptr = ptr::addr_of(server_stream);
    let server_data = {
        server_stream_ptr: server_stream_ptr,
        stream_closed_ch: comm::chan(stream_closed_po),
        kill_ch: kill_ch,
602
        on_connect_cb: on_connect_cb,
603
        iotask: iotask,
604 605 606 607
        mut active: true
    };
    let server_data_ptr = ptr::addr_of(server_data);

608
    let setup_result = do comm::listen |setup_ch| {
J
Jeff Olson 已提交
609
        // this is to address a compiler warning about
610 611 612 613 614 615
        // an implicit copy.. it seems that double nested
        // will defeat a move sigil, as is done to the host_ip
        // arg above.. this same pattern works w/o complaint in
        // tcp::connect (because the iotask::interact cb isn't
        // nested within a comm::listen block)
        let loc_ip = copy(host_ip);
616
        do iotask::interact(iotask) |loop_ptr| {
617
            alt uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
618
              0i32 {
619 620 621
                uv::ll::set_data_for_uv_handle(
                    server_stream_ptr,
                    server_data_ptr);
622 623 624
                let addr_str = ip::format_addr(loc_ip);
                let bind_result = alt loc_ip {
                  ip::ipv4(addr) {
625
                    log(debug, fmt!{"addr: %?", addr});
626 627 628 629 630
                    let in_addr = uv::ll::ip4_addr(addr_str, port as int);
                    uv::ll::tcp_bind(server_stream_ptr,
                                     ptr::addr_of(in_addr))
                  }
                  ip::ipv6(addr) {
631
                    log(debug, fmt!{"addr: %?", addr});
632 633 634 635 636 637
                    let in_addr = uv::ll::ip6_addr(addr_str, port as int);
                    uv::ll::tcp_bind6(server_stream_ptr,
                                     ptr::addr_of(in_addr))
                  }
                };
                alt bind_result {
638
                  0i32 {
639 640 641 642 643 644 645
                    alt uv::ll::listen(server_stream_ptr,
                                       backlog as libc::c_int,
                                       tcp_lfc_on_connection_cb) {
                      0i32 {
                        comm::send(setup_ch, none);
                      }
                      _ {
646
                        log(debug, ~"failure to uv_listen()");
647 648 649 650
                        let err_data = uv::ll::get_last_err_data(loop_ptr);
                        comm::send(setup_ch, some(err_data));
                      }
                    }
651 652
                  }
                  _ {
653
                    log(debug, ~"failure to uv_tcp_bind");
654 655 656 657 658 659
                    let err_data = uv::ll::get_last_err_data(loop_ptr);
                    comm::send(setup_ch, some(err_data));
                  }
                }
              }
              _ {
660
                log(debug, ~"failure to uv_tcp_init");
661 662 663 664
                let err_data = uv::ll::get_last_err_data(loop_ptr);
                comm::send(setup_ch, some(err_data));
              }
            }
665 666
        };
        setup_ch.recv()
667
    };
668
    alt setup_result {
669
      some(err_data) {
670
        do iotask::interact(iotask) |loop_ptr| {
671 672
            log(debug, fmt!{"tcp::listen post-kill recv hl interact %?",
                            loop_ptr});
673 674 675 676 677
            (*server_data_ptr).active = false;
            uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
        };
        stream_closed_po.recv();
        alt err_data.err_name {
678 679
          ~"EACCES" {
            log(debug, ~"Got EACCES error");
680 681
            result::err(access_denied)
          }
682 683
          ~"EADDRINUSE" {
            log(debug, ~"Got EADDRINUSE error");
684
            result::err(address_in_use)
685 686
          }
          _ {
687 688
            log(debug, fmt!{"Got '%s' '%s' libuv error",
                            err_data.err_name, err_data.err_msg});
689 690
            result::err(
                generic_listen_err(err_data.err_name, err_data.err_msg))
691 692 693 694 695
          }
        }
      }
      none {
        on_establish_cb(kill_ch);
N
Niko Matsakis 已提交
696
        let kill_result = comm::recv(kill_po);
B
Brian Anderson 已提交
697
        do iotask::interact(iotask) |loop_ptr| {
698 699
            log(debug, fmt!{"tcp::listen post-kill recv hl interact %?",
                            loop_ptr});
700 701 702
            (*server_data_ptr).active = false;
            uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
        };
703
        stream_closed_po.recv();
704 705 706
        alt kill_result {
          // some failure post bind/listen
          some(err_data) {
707 708
            result::err(generic_listen_err(err_data.err_name,
                                            err_data.err_msg))
709 710 711 712 713 714 715 716 717 718
          }
          // clean exit
          none {
            result::ok(())
          }
        }
      }
    }
}

719 720 721 722 723 724 725 726 727 728 729 730 731 732 733
/**
 * Convert a `net::tcp::tcp_socket` to a `net::tcp::tcp_socket_buf`.
 *
 * This function takes ownership of a `net::tcp::tcp_socket`, returning it
 * stored within a buffered wrapper, which can be converted to a `io::reader`
 * or `io::writer`
 *
 * # Arguments
 *
 * * `sock` -- a `net::tcp::tcp_socket` that you want to buffer
 *
 * # Returns
 *
 * A buffered wrapper that you can cast as an `io::reader` or `io::writer`
 */
734
fn socket_buf(-sock: tcp_socket) -> tcp_socket_buf {
735
    tcp_socket_buf(@{ sock: sock, mut buf: ~[] })
736 737
}

738
/// Convenience methods extending `net::tcp::tcp_socket`
739
impl tcp_socket for tcp_socket {
740
    fn read_start() -> result::result<comm::port<
741
        result::result<~[u8], tcp_err_data>>, tcp_err_data> {
742 743
        read_start(self)
    }
744
    fn read_stop(-read_port:
745
                 comm::port<result::result<~[u8], tcp_err_data>>) ->
746
        result::result<(), tcp_err_data> {
747
        read_stop(self, read_port)
748
    }
749
    fn read(timeout_msecs: uint) ->
750
        result::result<~[u8], tcp_err_data> {
751 752 753
        read(self, timeout_msecs)
    }
    fn read_future(timeout_msecs: uint) ->
754
        future::future<result::result<~[u8], tcp_err_data>> {
755 756
        read_future(self, timeout_msecs)
    }
757
    fn write(raw_write_data: ~[u8])
758 759 760
        -> result::result<(), tcp_err_data> {
        write(self, raw_write_data)
    }
761
    fn write_future(raw_write_data: ~[u8])
762
        -> future::future<result::result<(), tcp_err_data>> {
763 764
        write_future(self, raw_write_data)
    }
765
}
766

767
/// Implementation of `io::reader` iface for a buffered `net::tcp::tcp_socket`
768
impl tcp_socket_buf of io::reader for @tcp_socket_buf {
769 770 771 772
    fn read(buf: &[mut u8], len: uint) -> uint {
        // Loop until our buffer has enough data in it for us to read from.
        while self.data.buf.len() < len {
            let read_result = read(self.data.sock, 0u);
773
            if read_result.is_err() {
774
                let err_data = read_result.get_err();
775 776 777 778

                if err_data.err_name == ~"EOF" {
                    break;
                } else {
779 780
                    debug!{"ERROR sock_buf as io::reader.read err %? %?",
                           err_data.err_name, err_data.err_msg};
781 782 783

                    ret 0;
                }
784 785
            }
            else {
786
                vec::push_all(self.data.buf, result::unwrap(read_result));
787 788
            }
        }
789 790 791 792 793 794 795 796 797 798 799

        let count = uint::min(len, self.data.buf.len());

        let mut data = ~[];
        self.data.buf <-> data;

        vec::u8::memcpy(buf, vec::view(data, 0, data.len()), count);

        vec::push_all(self.data.buf, vec::view(data, count, data.len()));

        count
800 801
    }
    fn read_byte() -> int {
802 803
        let bytes = ~[0];
        if self.read(bytes, 1u) == 0 { fail } else { bytes[0] as int }
804 805
    }
    fn unread_byte(amt: int) {
806
        vec::unshift((*(self.data)).buf, amt as u8);
807 808 809 810 811
    }
    fn eof() -> bool {
        false // noop
    }
    fn seek(dist: int, seek: io::seek_style) {
812
        log(debug, fmt!{"tcp_socket_buf seek stub %? %?", dist, seek});
813 814 815 816 817 818 819
        // noop
    }
    fn tell() -> uint {
        0u // noop
    }
}

820
/// Implementation of `io::reader` iface for a buffered `net::tcp::tcp_socket`
821
impl tcp_socket_buf of io::writer for @tcp_socket_buf {
822
    fn write(data: &[const u8]) unsafe {
823 824
        let socket_data_ptr =
            ptr::addr_of(*((*(self.data)).sock).socket_data);
825 826
        let w_result = write_common_impl(socket_data_ptr,
                                        vec::slice(data, 0, vec::len(data)));
827
        if w_result.is_err() {
828
            let err_data = w_result.get_err();
829 830
            log(debug, fmt!{"ERROR sock_buf as io::writer.writer err: %? %?",
                             err_data.err_name, err_data.err_msg});
831
        }
832 833
    }
    fn seek(dist: int, seek: io::seek_style) {
834
      log(debug, fmt!{"tcp_socket_buf seek stub %? %?", dist, seek});
835 836 837 838 839 840 841 842
        // noop
    }
    fn tell() -> uint {
        0u
    }
    fn flush() -> int {
        0
    }
843 844 845
    fn get_type() -> io::writer_type {
        io::file
    }
846 847
}

848 849
// INTERNAL API

850 851 852 853 854 855 856 857
fn tear_down_socket_data(socket_data: @tcp_socket_data) unsafe {
    let closed_po = comm::port::<()>();
    let closed_ch = comm::chan(closed_po);
    let close_data = {
        closed_ch: closed_ch
    };
    let close_data_ptr = ptr::addr_of(close_data);
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
858
    do iotask::interact((*socket_data).iotask) |loop_ptr| {
859 860
        log(debug, fmt!{"interact dtor for tcp_socket stream %? loop %?",
            stream_handle_ptr, loop_ptr});
861 862 863 864 865
        uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                       close_data_ptr);
        uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb);
    };
    comm::recv(closed_po);
866
    log(debug, fmt!{"about to free socket_data at %?", socket_data});
867 868
    rustrt::rust_uv_current_kernel_free(stream_handle_ptr
                                       as *libc::c_void);
869
    log(debug, ~"exiting dtor for tcp_socket");
870 871
}

872 873
// shared implementation for tcp::read
fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint)
874
    -> result::result<~[u8],tcp_err_data> unsafe {
875
    log(debug, ~"starting tcp::read");
876
    let iotask = (*socket_data).iotask;
877
    let rs_result = read_start_common_impl(socket_data);
878
    if result::is_err(rs_result) {
879 880 881 882
        let err_data = result::get_err(rs_result);
        result::err(err_data)
    }
    else {
883
        log(debug, ~"tcp::read before recv_timeout");
884 885
        let read_result = if timeout_msecs > 0u {
            timer::recv_timeout(
886
               iotask, timeout_msecs, result::get(rs_result))
887 888 889
        } else {
            some(comm::recv(result::get(rs_result)))
        };
890
        log(debug, ~"tcp::read after recv_timeout");
891 892
        alt read_result {
          none {
893
            log(debug, ~"tcp::read: timed out..");
894
            let err_data = {
895 896
                err_name: ~"TIMEOUT",
                err_msg: ~"req timed out"
897 898 899 900 901
            };
            read_stop_common_impl(socket_data);
            result::err(err_data)
          }
          some(data_result) {
902
            log(debug, ~"tcp::read got data");
903 904 905 906 907 908 909 910 911 912 913 914 915
            read_stop_common_impl(socket_data);
            data_result
          }
        }
    }
}

// shared impl for read_stop
fn read_stop_common_impl(socket_data: *tcp_socket_data) ->
    result::result<(), tcp_err_data> unsafe {
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
    let stop_po = comm::port::<option<tcp_err_data>>();
    let stop_ch = comm::chan(stop_po);
B
Brian Anderson 已提交
916
    do iotask::interact((*socket_data).iotask) |loop_ptr| {
917
        log(debug, ~"in interact cb for tcp::read_stop");
918 919
        alt uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
          0i32 {
920
            log(debug, ~"successfully called uv_read_stop");
921 922 923
            comm::send(stop_ch, none);
          }
          _ {
924
            log(debug, ~"failure in calling uv_read_stop");
925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942
            let err_data = uv::ll::get_last_err_data(loop_ptr);
            comm::send(stop_ch, some(err_data.to_tcp_err()));
          }
        }
    };
    alt comm::recv(stop_po) {
      some(err_data) {
        result::err(err_data.to_tcp_err())
      }
      none {
        result::ok(())
      }
    }
}

// shared impl for read_start
fn read_start_common_impl(socket_data: *tcp_socket_data)
    -> result::result<comm::port<
943
        result::result<~[u8], tcp_err_data>>, tcp_err_data> unsafe {
944 945 946
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
    let start_po = comm::port::<option<uv::ll::uv_err_data>>();
    let start_ch = comm::chan(start_po);
947
    log(debug, ~"in tcp::read_start before interact loop");
B
Brian Anderson 已提交
948
    do iotask::interact((*socket_data).iotask) |loop_ptr| {
949
        log(debug, fmt!{"in tcp::read_start interact cb %?", loop_ptr});
950 951 952 953
        alt uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
                               on_alloc_cb,
                               on_tcp_read_cb) {
          0i32 {
954
            log(debug, ~"success doing uv_read_start");
955 956 957
            comm::send(start_ch, none);
          }
          _ {
958
            log(debug, ~"error attempting uv_read_start");
959 960 961 962 963 964 965 966 967 968 969 970 971 972 973
            let err_data = uv::ll::get_last_err_data(loop_ptr);
            comm::send(start_ch, some(err_data));
          }
        }
    };
    alt comm::recv(start_po) {
      some(err_data) {
        result::err(err_data.to_tcp_err())
      }
      none {
        result::ok((*socket_data).reader_po)
      }
    }
}

974 975
// helper to convert a "class" vector of [u8] to a *[uv::ll::uv_buf_t]

976 977
// shared implementation used by write and write_future
fn write_common_impl(socket_data_ptr: *tcp_socket_data,
978
                     raw_write_data: ~[u8])
979 980 981 982
    -> result::result<(), tcp_err_data> unsafe {
    let write_req_ptr = ptr::addr_of((*socket_data_ptr).write_req);
    let stream_handle_ptr =
        (*socket_data_ptr).stream_handle_ptr;
983
    let write_buf_vec =  ~[ uv::ll::buf_init(
984
        vec::unsafe::to_ptr(raw_write_data),
985
        vec::len(raw_write_data)) ];
986 987 988 989 990 991
    let write_buf_vec_ptr = ptr::addr_of(write_buf_vec);
    let result_po = comm::port::<tcp_write_result>();
    let write_data = {
        result_ch: comm::chan(result_po)
    };
    let write_data_ptr = ptr::addr_of(write_data);
B
Brian Anderson 已提交
992
    do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| {
993
        log(debug, fmt!{"in interact cb for tcp::write %?", loop_ptr});
994 995 996 997 998
        alt uv::ll::write(write_req_ptr,
                          stream_handle_ptr,
                          write_buf_vec_ptr,
                          tcp_write_complete_cb) {
          0i32 {
999
            log(debug, ~"uv_write() invoked successfully");
1000 1001 1002
            uv::ll::set_data_for_req(write_req_ptr, write_data_ptr);
          }
          _ {
1003
            log(debug, ~"error invoking uv_write()");
1004 1005 1006 1007 1008 1009
            let err_data = uv::ll::get_last_err_data(loop_ptr);
            comm::send((*write_data_ptr).result_ch,
                       tcp_write_error(err_data.to_tcp_err()));
          }
        }
    };
1010 1011 1012 1013
    // FIXME (#2656): Instead of passing unsafe pointers to local data,
    // and waiting here for the write to complete, we should transfer
    // ownership of everything to the I/O task and let it deal with the
    // aftermath, so we don't have to sit here blocking.
1014 1015 1016 1017 1018 1019
    alt comm::recv(result_po) {
      tcp_write_success { result::ok(()) }
      tcp_write_error(err_data) { result::err(err_data.to_tcp_err()) }
    }
}

1020 1021 1022 1023
enum tcp_new_connection {
    new_tcp_conn(*uv::ll::uv_tcp_t)
}

1024
type tcp_listen_fc_data = {
1025 1026 1027
    server_stream_ptr: *uv::ll::uv_tcp_t,
    stream_closed_ch: comm::chan<()>,
    kill_ch: comm::chan<option<tcp_err_data>>,
1028
    on_connect_cb: fn~(*uv::ll::uv_tcp_t),
1029
    iotask: iotask,
1030 1031 1032
    mut active: bool
};

G
Graydon Hoare 已提交
1033
extern fn tcp_lfc_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
1034
    let server_data_ptr = uv::ll::get_data_for_uv_handle(
1035
        handle) as *tcp_listen_fc_data;
1036 1037 1038
    comm::send((*server_data_ptr).stream_closed_ch, ());
}

G
Graydon Hoare 已提交
1039
extern fn tcp_lfc_on_connection_cb(handle: *uv::ll::uv_tcp_t,
1040 1041
                                     status: libc::c_int) unsafe {
    let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
1042
        as *tcp_listen_fc_data;
1043
    let kill_ch = (*server_data_ptr).kill_ch;
1044
    if (*server_data_ptr).active {
1045 1046
        alt status {
          0i32 {
1047
            (*server_data_ptr).on_connect_cb(handle);
1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059
          }
          _ {
            let loop_ptr = uv::ll::get_loop_for_uv_handle(handle);
            comm::send(kill_ch,
                       some(uv::ll::get_last_err_data(loop_ptr)
                            .to_tcp_err()));
            (*server_data_ptr).active = false;
          }
        }
    }
}

1060
fn malloc_uv_tcp_t() -> *uv::ll::uv_tcp_t unsafe {
1061
    rustrt::rust_uv_current_kernel_malloc(
1062
        rustrt::rust_uv_helper_uv_tcp_t_size()) as *uv::ll::uv_tcp_t
1063 1064
}

1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080
enum tcp_connect_result {
    tcp_connected(tcp_socket),
    tcp_connect_error(tcp_err_data)
}

enum tcp_write_result {
    tcp_write_success,
    tcp_write_error(tcp_err_data)
}

enum tcp_read_start_result {
    tcp_read_start_success(comm::port<tcp_read_result>),
    tcp_read_start_error(tcp_err_data)
}

enum tcp_read_result {
1081
    tcp_read_data(~[u8]),
1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095
    tcp_read_done,
    tcp_read_err(tcp_err_data)
}

iface to_tcp_err_iface {
    fn to_tcp_err() -> tcp_err_data;
}

impl of to_tcp_err_iface for uv::ll::uv_err_data {
    fn to_tcp_err() -> tcp_err_data {
        { err_name: self.err_name, err_msg: self.err_msg }
    }
}

G
Graydon Hoare 已提交
1096
extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
1097 1098
                    nread: libc::ssize_t,
                    ++buf: uv::ll::uv_buf_t) unsafe {
1099 1100
    log(debug, fmt!{"entering on_tcp_read_cb stream: %? nread: %?",
                    stream, nread});
1101 1102 1103
    let loop_ptr = uv::ll::get_loop_for_uv_handle(stream);
    let socket_data_ptr = uv::ll::get_data_for_uv_handle(stream)
        as *tcp_socket_data;
1104
    alt nread as int {
1105 1106
      // incoming err.. probably eof
      -1 {
1107
        let err_data = uv::ll::get_last_err_data(loop_ptr).to_tcp_err();
1108 1109
        log(debug, fmt!{"on_tcp_read_cb: incoming err.. name %? msg %?",
                        err_data.err_name, err_data.err_msg});
1110 1111
        let reader_ch = (*socket_data_ptr).reader_ch;
        comm::send(reader_ch, result::err(err_data));
1112 1113 1114 1115 1116 1117
      }
      // do nothing .. unneeded buf
      0 {}
      // have data
      _ {
        // we have data
1118
        log(debug, fmt!{"tcp on_read_cb nread: %d", nread as int});
1119
        let reader_ch = (*socket_data_ptr).reader_ch;
1120
        let buf_base = uv::ll::get_base_from_buf(buf);
1121
        let new_bytes = vec::unsafe::from_buf(buf_base, nread as uint);
1122
        comm::send(reader_ch, result::ok(new_bytes));
1123 1124 1125
      }
    }
    uv::ll::free_base_of_buf(buf);
1126
    log(debug, ~"exiting on_tcp_read_cb");
1127 1128
}

G
Graydon Hoare 已提交
1129
extern fn on_alloc_cb(handle: *libc::c_void,
1130
                     ++suggested_size: size_t)
1131
    -> uv::ll::uv_buf_t unsafe {
1132
    log(debug, ~"tcp read on_alloc_cb!");
1133
    let char_ptr = uv::ll::malloc_buf_base_of(suggested_size);
1134
    log(debug, fmt!{"tcp read on_alloc_cb h: %? char_ptr: %u sugsize: %u",
1135 1136
                     handle,
                     char_ptr as uint,
1137
                     suggested_size as uint});
1138
    uv::ll::buf_init(char_ptr, suggested_size as uint)
1139
}
1140 1141 1142 1143 1144

type tcp_socket_close_data = {
    closed_ch: comm::chan<()>
};

G
Graydon Hoare 已提交
1145
extern fn tcp_socket_dtor_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
1146 1147 1148 1149
    let data = uv::ll::get_data_for_uv_handle(handle)
        as *tcp_socket_close_data;
    let closed_ch = (*data).closed_ch;
    comm::send(closed_ch, ());
1150
    log(debug, ~"tcp_socket_dtor_close_cb exiting..");
1151 1152
}

G
Graydon Hoare 已提交
1153
extern fn tcp_write_complete_cb(write_req: *uv::ll::uv_write_t,
1154 1155 1156
                              status: libc::c_int) unsafe {
    let write_data_ptr = uv::ll::get_data_for_req(write_req)
        as *write_req_data;
1157
    if status == 0i32 {
1158
        log(debug, ~"successful write complete");
1159
        comm::send((*write_data_ptr).result_ch, tcp_write_success);
1160
    } else {
1161 1162 1163 1164
        let stream_handle_ptr = uv::ll::get_stream_handle_from_write_req(
            write_req);
        let loop_ptr = uv::ll::get_loop_for_uv_handle(stream_handle_ptr);
        let err_data = uv::ll::get_last_err_data(loop_ptr);
1165
        log(debug, ~"failure to write");
1166 1167 1168 1169 1170 1171 1172 1173
        comm::send((*write_data_ptr).result_ch, tcp_write_error(err_data));
    }
}

type write_req_data = {
    result_ch: comm::chan<tcp_write_result>
};

J
Jeff Olson 已提交
1174 1175 1176 1177 1178
type connect_req_data = {
    result_ch: comm::chan<conn_attempt>,
    closed_signal_ch: comm::chan<()>
};

G
Graydon Hoare 已提交
1179
extern fn stream_error_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
J
Jeff Olson 已提交
1180 1181 1182
    let data = uv::ll::get_data_for_uv_handle(handle) as
        *connect_req_data;
    comm::send((*data).closed_signal_ch, ());
1183
    log(debug, fmt!{"exiting steam_error_close_cb for %?", handle});
J
Jeff Olson 已提交
1184 1185
}

G
Graydon Hoare 已提交
1186
extern fn tcp_connect_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
1187
    log(debug, fmt!{"closed client tcp handle %?", handle});
J
Jeff Olson 已提交
1188 1189
}

G
Graydon Hoare 已提交
1190
extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t,
J
Jeff Olson 已提交
1191 1192 1193 1194
                                   status: libc::c_int) unsafe {
    let conn_data_ptr = (uv::ll::get_data_for_req(connect_req_ptr)
                      as *connect_req_data);
    let result_ch = (*conn_data_ptr).result_ch;
1195
    log(debug, fmt!{"tcp_connect result_ch %?", result_ch});
J
Jeff Olson 已提交
1196 1197 1198 1199
    let tcp_stream_ptr =
        uv::ll::get_stream_handle_from_connect_req(connect_req_ptr);
    alt status {
      0i32 {
1200
        log(debug, ~"successful tcp connection!");
J
Jeff Olson 已提交
1201 1202 1203
        comm::send(result_ch, conn_success);
      }
      _ {
1204
        log(debug, ~"error in tcp_connect_on_connect_cb");
J
Jeff Olson 已提交
1205 1206
        let loop_ptr = uv::ll::get_loop_for_uv_handle(tcp_stream_ptr);
        let err_data = uv::ll::get_last_err_data(loop_ptr);
1207 1208
        log(debug, fmt!{"err_data %? %?", err_data.err_name,
                        err_data.err_msg});
J
Jeff Olson 已提交
1209 1210 1211 1212 1213 1214
        comm::send(result_ch, conn_failure(err_data));
        uv::ll::set_data_for_uv_handle(tcp_stream_ptr,
                                       conn_data_ptr);
        uv::ll::close(tcp_stream_ptr, stream_error_close_cb);
      }
    }
1215
    log(debug, ~"leaving tcp_connect_on_connect_cb");
J
Jeff Olson 已提交
1216 1217 1218 1219 1220 1221 1222 1223
}

enum conn_attempt {
    conn_success,
    conn_failure(uv::ll::uv_err_data)
}

type tcp_socket_data = {
1224 1225
    reader_po: comm::port<result::result<~[u8], tcp_err_data>>,
    reader_ch: comm::chan<result::result<~[u8], tcp_err_data>>,
1226
    stream_handle_ptr: *uv::ll::uv_tcp_t,
J
Jeff Olson 已提交
1227
    connect_req: uv::ll::uv_connect_t,
1228
    write_req: uv::ll::uv_write_t,
1229
    iotask: iotask
J
Jeff Olson 已提交
1230 1231
};

1232 1233
type tcp_buffered_socket_data = {
    sock: tcp_socket,
1234
    mut buf: ~[u8]
1235
};
J
Jeff Olson 已提交
1236

1237
//#[cfg(test)]
1238
mod test {
1239
    // FIXME don't run on fbsd or linux 32 bit (#2064)
1240 1241 1242 1243 1244 1245 1246 1247 1248 1249
    #[cfg(target_os="win32")]
    #[cfg(target_os="darwin")]
    #[cfg(target_os="linux")]
    mod tcp_ipv4_server_and_client_test {
        #[cfg(target_arch="x86_64")]
        mod impl64 {
            #[test]
            fn test_gl_tcp_server_and_client_ipv4() unsafe {
                impl_gl_tcp_ipv4_server_and_client();
            }
1250
            #[test]
1251 1252 1253
            fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
                impl_gl_tcp_ipv4_client_error_connection_refused();
            }
1254 1255 1256 1257 1258 1259 1260
            #[test]
            fn test_gl_tcp_server_address_in_use() unsafe {
                impl_gl_tcp_ipv4_server_address_in_use();
            }
            #[test]
            fn test_gl_tcp_server_access_denied() unsafe {
                impl_gl_tcp_ipv4_server_access_denied();
1261
            }
1262 1263 1264 1265
            #[test]
            fn test_gl_tcp_ipv4_server_client_reader_writer() {
                impl_gl_tcp_ipv4_server_client_reader_writer();
            }
1266

1267 1268 1269 1270 1271 1272 1273 1274
        }
        #[cfg(target_arch="x86")]
        mod impl32 {
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_server_and_client_ipv4() unsafe {
                impl_gl_tcp_ipv4_server_and_client();
            }
1275 1276
            #[test]
            #[ignore(cfg(target_os = "linux"))]
1277 1278 1279
            fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
                impl_gl_tcp_ipv4_client_error_connection_refused();
            }
1280 1281 1282 1283 1284 1285 1286
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_server_address_in_use() unsafe {
                impl_gl_tcp_ipv4_server_address_in_use();
            }
            #[test]
            #[ignore(cfg(target_os = "linux"))]
1287
            #[ignore(cfg(windows), reason = "deadlocking bots")]
1288 1289 1290
            fn test_gl_tcp_server_access_denied() unsafe {
                impl_gl_tcp_ipv4_server_access_denied();
            }
1291 1292 1293 1294
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_ipv4_server_client_reader_writer() {
                impl_gl_tcp_ipv4_server_client_reader_writer();
1295
            }
1296
        }
1297
    }
1298
    fn impl_gl_tcp_ipv4_server_and_client() {
1299
        let hl_loop = uv::global_loop::get();
1300
        let server_ip = ~"127.0.0.1";
1301
        let server_port = 8888u;
1302 1303
        let expected_req = ~"ping";
        let expected_resp = ~"pong";
1304

1305
        let server_result_po = comm::port::<~str>();
1306
        let server_result_ch = comm::chan(server_result_po);
1307 1308 1309

        let cont_po = comm::port::<()>();
        let cont_ch = comm::chan(cont_po);
1310
        // server
1311
        do task::spawn_sched(task::manual_threads(1u)) {
B
Brian Anderson 已提交
1312
            let actual_req = do comm::listen |server_ch| {
1313 1314 1315 1316
                run_tcp_test_server(
                    server_ip,
                    server_port,
                    expected_resp,
1317
                    server_ch,
1318 1319
                    cont_ch,
                    hl_loop)
1320 1321 1322
            };
            server_result_ch.send(actual_req);
        };
1323
        comm::recv(cont_po);
1324
        // client
1325
        log(debug, ~"server started, firing up client..");
1326
        let actual_resp_result = do comm::listen |client_ch| {
1327 1328 1329 1330
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
1331 1332
                client_ch,
                hl_loop)
1333
        };
1334
        assert actual_resp_result.is_ok();
1335
        let actual_resp = actual_resp_result.get();
1336
        let actual_req = comm::recv(server_result_po);
1337 1338 1339 1340
        log(debug, fmt!{"REQ: expected: '%s' actual: '%s'",
                       expected_req, actual_req});
        log(debug, fmt!{"RESP: expected: '%s' actual: '%s'",
                       expected_resp, actual_resp});
1341 1342
        assert str::contains(actual_req, expected_req);
        assert str::contains(actual_resp, expected_resp);
1343
    }
1344
    fn impl_gl_tcp_ipv4_client_error_connection_refused() {
1345
        let hl_loop = uv::global_loop::get();
1346
        let server_ip = ~"127.0.0.1";
1347
        let server_port = 8889u;
1348
        let expected_req = ~"ping";
1349
        // client
1350
        log(debug, ~"firing up client..");
1351
        let actual_resp_result = do comm::listen |client_ch| {
1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
                client_ch,
                hl_loop)
        };
        alt actual_resp_result.get_err() {
          connection_refused {
          }
          _ {
1363
            fail ~"unknown error.. expected connection_refused"
1364 1365 1366
          }
        }
    }
1367 1368
    fn impl_gl_tcp_ipv4_server_address_in_use() {
        let hl_loop = uv::global_loop::get();
1369
        let server_ip = ~"127.0.0.1";
1370
        let server_port = 8890u;
1371 1372
        let expected_req = ~"ping";
        let expected_resp = ~"pong";
1373

1374
        let server_result_po = comm::port::<~str>();
1375 1376 1377 1378 1379
        let server_result_ch = comm::chan(server_result_po);

        let cont_po = comm::port::<()>();
        let cont_ch = comm::chan(cont_po);
        // server
1380
        do task::spawn_sched(task::manual_threads(1u)) {
B
Brian Anderson 已提交
1381
            let actual_req = do comm::listen |server_ch| {
1382
                run_tcp_test_server(
1383 1384 1385 1386
                    server_ip,
                    server_port,
                    expected_resp,
                    server_ch,
1387 1388
                    cont_ch,
                    hl_loop)
1389 1390 1391 1392
            };
            server_result_ch.send(actual_req);
        };
        comm::recv(cont_po);
1393 1394 1395 1396 1397 1398
        // this one should fail..
        let listen_err = run_tcp_test_server_fail(
                            server_ip,
                            server_port,
                            hl_loop);
        // client.. just doing this so that the first server tears down
1399
        log(debug, ~"server started, firing up client..");
1400
        do comm::listen |client_ch| {
1401 1402 1403 1404
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
1405 1406
                client_ch,
                hl_loop)
1407
        };
1408 1409 1410 1411 1412
        alt listen_err {
          address_in_use {
            assert true;
          }
          _ {
1413 1414
            fail ~"expected address_in_use listen error,"+
                      ~"but got a different error varient. check logs.";
1415 1416 1417 1418 1419
          }
        }
    }
    fn impl_gl_tcp_ipv4_server_access_denied() {
        let hl_loop = uv::global_loop::get();
1420
        let server_ip = ~"127.0.0.1";
1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431
        let server_port = 80u;
        // this one should fail..
        let listen_err = run_tcp_test_server_fail(
                            server_ip,
                            server_port,
                            hl_loop);
        alt listen_err {
          access_denied {
            assert true;
          }
          _ {
1432 1433
            fail ~"expected address_in_use listen error,"+
                      ~"but got a different error varient. check logs.";
1434 1435 1436
          }
        }
    }
1437 1438
    fn impl_gl_tcp_ipv4_server_client_reader_writer() {
        let iotask = uv::global_loop::get();
1439
        let server_ip = ~"127.0.0.1";
1440
        let server_port = 8891u;
1441 1442
        let expected_req = ~"ping";
        let expected_resp = ~"pong";
1443

1444
        let server_result_po = comm::port::<~str>();
1445 1446 1447 1448 1449
        let server_result_ch = comm::chan(server_result_po);

        let cont_po = comm::port::<()>();
        let cont_ch = comm::chan(cont_po);
        // server
1450
        do task::spawn_sched(task::manual_threads(1u)) {
1451
            let actual_req = do comm::listen |server_ch| {
1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465
                run_tcp_test_server(
                    server_ip,
                    server_port,
                    expected_resp,
                    server_ch,
                    cont_ch,
                    iotask)
            };
            server_result_ch.send(actual_req);
        };
        comm::recv(cont_po);
        // client
        let server_addr = ip::v4::parse_addr(server_ip);
        let conn_result = connect(server_addr, server_port, iotask);
1466
        if result::is_err(conn_result) {
1467 1468 1469 1470 1471 1472
            assert false;
        }
        let sock_buf = @socket_buf(result::unwrap(conn_result));
        buf_write(sock_buf as io::writer, expected_req);

        // so contrived!
1473
        let actual_resp = do str::as_bytes(expected_resp) |resp_buf| {
1474 1475 1476
            buf_read(sock_buf as io::reader,
                     vec::len(resp_buf))
        };
1477

1478
        let actual_req = comm::recv(server_result_po);
1479 1480 1481 1482
        log(debug, fmt!{"REQ: expected: '%s' actual: '%s'",
                       expected_req, actual_req});
        log(debug, fmt!{"RESP: expected: '%s' actual: '%s'",
                       expected_resp, actual_resp});
1483 1484 1485
        assert str::contains(actual_req, expected_req);
        assert str::contains(actual_resp, expected_resp);
    }
1486

1487
    fn buf_write(+w: io::writer, val: ~str) {
1488
        log(debug, fmt!{"BUF_WRITE: val len %?", str::len(val)});
1489
        do str::byte_slice(val) |b_slice| {
1490 1491
            log(debug, fmt!{"BUF_WRITE: b_slice len %?",
                            vec::len(b_slice)});
1492
            w.write(b_slice)
1493 1494 1495
        }
    }

1496
    fn buf_read(+r: io::reader, len: uint) -> ~str {
1497
        let new_bytes = r.read_bytes(len);
1498 1499
        log(debug, fmt!{"in buf_read.. new_bytes len: %?",
                        vec::len(new_bytes)});
1500 1501
        str::from_bytes(new_bytes)
    }
1502

1503 1504
    fn run_tcp_test_server(server_ip: ~str, server_port: uint, resp: ~str,
                          server_ch: comm::chan<~str>,
1505
                          cont_ch: comm::chan<()>,
1506
                          iotask: iotask) -> ~str {
1507 1508 1509
        let server_ip_addr = ip::v4::parse_addr(server_ip);
        let listen_result = listen(server_ip_addr, server_port, 128u, iotask,
            // on_establish_cb -- called when listener is set up
1510
            |kill_ch| {
1511 1512
                log(debug, fmt!{"establish_cb %?",
                    kill_ch});
1513 1514 1515 1516
                comm::send(cont_ch, ());
            },
            // risky to run this on the loop, but some users
            // will want the POWER
1517
            |new_conn, kill_ch| {
1518
            log(debug, ~"SERVER: new connection!");
1519
            do comm::listen |cont_ch| {
1520
                do task::spawn_sched(task::manual_threads(1u)) {
1521
                    log(debug, ~"SERVER: starting worker for new req");
1522 1523

                    let accept_result = accept(new_conn);
1524
                    log(debug, ~"SERVER: after accept()");
1525
                    if result::is_err(accept_result) {
1526
                        log(debug, ~"SERVER: error accept connection");
1527 1528 1529
                        let err_data = result::get_err(accept_result);
                        comm::send(kill_ch, some(err_data));
                        log(debug,
1530
                            ~"SERVER/WORKER: send on err cont ch");
1531 1532 1533 1534
                        cont_ch.send(());
                    }
                    else {
                        log(debug,
1535
                            ~"SERVER/WORKER: send on cont ch");
1536 1537
                        cont_ch.send(());
                        let sock = result::unwrap(accept_result);
1538 1539
                        log(debug, ~"SERVER: successfully accepted"+
                            ~"connection!");
1540 1541 1542
                        let received_req_bytes = read(sock, 0u);
                        alt received_req_bytes {
                          result::ok(data) {
1543
                            log(debug, ~"SERVER: got REQ str::from_bytes..");
1544 1545
                            log(debug, fmt!{"SERVER: REQ data len: %?",
                                            vec::len(data)});
1546 1547
                            server_ch.send(
                                str::from_bytes(data));
1548
                            log(debug, ~"SERVER: before write");
1549
                            tcp_write_single(sock, str::bytes(resp));
1550
                            log(debug, ~"SERVER: after write.. die");
1551 1552 1553
                            comm::send(kill_ch, none);
                          }
                          result::err(err_data) {
1554 1555
                            log(debug, fmt!{"SERVER: error recvd: %s %s",
                                err_data.err_name, err_data.err_msg});
1556
                            comm::send(kill_ch, some(err_data));
1557
                            server_ch.send(~"");
1558
                          }
1559
                        }
1560
                        log(debug, ~"SERVER: worker spinning down");
1561
                    }
1562
                }
1563
                log(debug, ~"SERVER: waiting to recv on cont_ch");
1564 1565
                cont_ch.recv()
            };
1566
            log(debug, ~"SERVER: recv'd on cont_ch..leaving listen cb");
1567 1568 1569 1570 1571
        });
        // err check on listen_result
        if result::is_err(listen_result) {
            alt result::get_err(listen_result) {
              generic_listen_err(name, msg) {
1572 1573
                fail fmt!{"SERVER: exited abnormally name %s msg %s",
                                name, msg};
1574
              }
1575
              access_denied {
1576
                fail ~"SERVER: exited abnormally, got access denied..";
1577 1578
              }
              address_in_use {
1579
                fail ~"SERVER: exited abnormally, got address in use...";
1580
              }
1581
            }
1582
        }
1583
        let ret_val = server_ch.recv();
1584
        log(debug, fmt!{"SERVER: exited and got ret val: '%s'", ret_val});
1585 1586 1587
        ret_val
    }

1588
    fn run_tcp_test_server_fail(server_ip: ~str, server_port: uint,
1589 1590 1591 1592
                          iotask: iotask) -> tcp_listen_err_data {
        let server_ip_addr = ip::v4::parse_addr(server_ip);
        let listen_result = listen(server_ip_addr, server_port, 128u, iotask,
            // on_establish_cb -- called when listener is set up
1593
            |kill_ch| {
1594 1595
                log(debug, fmt!{"establish_cb %?",
                    kill_ch});
1596
            },
1597
            |new_conn, kill_ch| {
1598 1599
                fail fmt!{"SERVER: shouldn't be called.. %? %?",
                           new_conn, kill_ch};
1600 1601
        });
        // err check on listen_result
1602
        if result::is_err(listen_result) {
1603 1604 1605
            result::get_err(listen_result)
        }
        else {
1606
            fail ~"SERVER: did not fail as expected"
1607 1608 1609
        }
    }

1610 1611 1612
    fn run_tcp_test_client(server_ip: ~str, server_port: uint, resp: ~str,
                          client_ch: comm::chan<~str>,
                          iotask: iotask) -> result::result<~str,
1613
                                                    tcp_connect_err_data> {
1614
        let server_ip_addr = ip::v4::parse_addr(server_ip);
1615

1616
        log(debug, ~"CLIENT: starting..");
1617
        let connect_result = connect(server_ip_addr, server_port, iotask);
1618
        if result::is_err(connect_result) {
1619
            log(debug, ~"CLIENT: failed to connect");
1620
            let err_data = result::get_err(connect_result);
1621
            err(err_data)
1622
        }
1623 1624 1625 1626
        else {
            let sock = result::unwrap(connect_result);
            let resp_bytes = str::bytes(resp);
            tcp_write_single(sock, resp_bytes);
1627
            let read_result = sock.read(0u);
1628
            if read_result.is_err() {
1629 1630
                log(debug, ~"CLIENT: failure to read");
                ok(~"")
1631
            }
1632 1633 1634
            else {
                client_ch.send(str::from_bytes(read_result.get()));
                let ret_val = client_ch.recv();
1635 1636
                log(debug, fmt!{"CLIENT: after client_ch recv ret: '%s'",
                   ret_val});
1637
                ok(ret_val)
1638 1639
            }
        }
1640 1641
    }

1642
    fn tcp_write_single(sock: tcp_socket, val: ~[u8]) {
1643 1644
        let write_result_future = sock.write_future(val);
        let write_result = write_result_future.get();
1645
        if result::is_err(write_result) {
1646
            log(debug, ~"tcp_write_single: write failed!");
1647
            let err_data = result::get_err(write_result);
1648 1649
            log(debug, fmt!{"tcp_write_single err name: %s msg: %s",
                err_data.err_name, err_data.err_msg});
1650
            // meh. torn on what to do here.
1651
            fail ~"tcp_write_single failed";
1652
        }
1653
    }
1654
}