net_tcp.rs 57.2 KB
Newer Older
1 2 3 4
#[doc="
High-level interface to libuv's TCP functionality
"];

J
Jeff Olson 已提交
5
import ip = net_ip;
6 7
import uv::iotask;
import uv::iotask::iotask;
8
import comm::methods;
9
import future_spawn = future::spawn;
10 11 12 13
// FIXME: should be able to replace w/ result::{result, extensions};
import result::*;
import libc::size_t;
import str::extensions;
J
Jeff Olson 已提交
14

15
// tcp interfaces
16
export tcp_socket;
17 18
// buffered socket
export tcp_socket_buf, socket_buf;
19 20
// errors
export tcp_err_data, tcp_connect_err_data;
21
// operations on a tcp_socket
22
export write, write_future, read_start, read_stop;
23
// tcp server stuff
24
export listen, accept;
25 26
// tcp client stuff
export connect;
J
Jeff Olson 已提交
27

28 29
#[nolink]
native mod rustrt {
30
    fn rust_uv_current_kernel_malloc(size: libc::c_uint) -> *libc::c_void;
31
    fn rust_uv_current_kernel_free(mem: *libc::c_void);
32
    fn rust_uv_helper_uv_tcp_t_size() -> libc::c_uint;
33 34
}

35 36
#[doc="
Encapsulates an open TCP/IP connection through libuv
37

38 39 40
`tcp_socket` is non-copyable/sendable and automagically handles closing the
underlying libuv data structures when it goes out of scope. This is the
data structure that is used for read/write operations over a TCP stream.
41
"]
42 43 44 45
class tcp_socket {
  let socket_data: @tcp_socket_data;
  new(socket_data: @tcp_socket_data) { self.socket_data = socket_data; }
  drop {
46
    unsafe {
47
        tear_down_socket_data(socket_data)
48 49
    }
  }
J
Jeff Olson 已提交
50 51
}

52 53 54 55 56 57 58 59 60 61
#[doc="
A buffered wrapper for `net::tcp::tcp_socket`

It is created with a call to `net::tcp::socket_buf()` and has impls that
satisfy both the `io::reader` and `io::writer` ifaces.
"]
resource tcp_socket_buf(data: @tcp_buffered_socket_data) {
    log(debug, #fmt("dtor for tcp_socket_buf.. %?", data));
}

62 63 64
#[doc="
Contains raw, string-based, error information returned from libuv
"]
65 66 67 68
type tcp_err_data = {
    err_name: str,
    err_msg: str
};
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
#[doc="
Details returned as part of a `result::err` result from `tcp::listen`
"]
enum tcp_listen_err_data {
    #[doc="
    Some unplanned-for error. The first and second fields correspond
    to libuv's `err_name` and `err_msg` fields, respectively.
    "]
    generic_listen_err(str, str),
    #[doc="
    Failed to bind to the requested IP/Port, because it is already in use.

    # Possible Causes

    * Attempting to bind to a port already bound to another listener
    "]
    address_in_use,
    #[doc="
    Request to bind to an IP/Port was denied by the system.

    # Possible Causes

    * Attemping to binding to an IP/Port as a non-Administrator
      on Windows Vista+
    * Attempting to bind, as a non-priv'd
      user, to 'privileged' ports (< 1024) on *nix
    "]
    access_denied
}
#[doc="
Details returned as part of a `result::err` result from `tcp::connect`
"]
101 102 103 104 105 106 107 108 109 110 111
enum tcp_connect_err_data {
    #[doc="
    Some unplanned-for error. The first and second fields correspond
    to libuv's `err_name` and `err_msg` fields, respectively.
    "]
    generic_connect_err(str, str),
    #[doc="
    Invalid IP or invalid port
    "]
    connection_refused
}
112

J
Jeff Olson 已提交
113 114 115 116 117
#[doc="
Initiate a client connection over TCP/IP

# Arguments

118
* `input_ip` - The IP address (versions 4 or 6) of the remote host
119
* `port` - the unsigned integer of the desired remote host port
120
* `iotask` - a `uv::iotask` that the tcp request will run on
J
Jeff Olson 已提交
121 122 123

# Returns

124 125 126 127
A `result` that, if the operation succeeds, contains a `net::net::tcp_socket`
that can be used to send and receive data to/from the remote host. In the
event of failure, a `net::tcp::tcp_connect_err_data` instance will be
returned
J
Jeff Olson 已提交
128
"]
129
fn connect(input_ip: ip::ip_addr, port: uint,
130
           iotask: iotask)
131
    -> result::result<tcp_socket, tcp_connect_err_data> unsafe {
J
Jeff Olson 已提交
132 133 134 135 136 137 138
    let result_po = comm::port::<conn_attempt>();
    let closed_signal_po = comm::port::<()>();
    let conn_data = {
        result_ch: comm::chan(result_po),
        closed_signal_ch: comm::chan(closed_signal_po)
    };
    let conn_data_ptr = ptr::addr_of(conn_data);
139
    let reader_po = comm::port::<result::result<[u8]/~, tcp_err_data>>();
140 141 142
    let stream_handle_ptr = malloc_uv_tcp_t();
    *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
    let socket_data = @{
143 144
        reader_po: reader_po,
        reader_ch: comm::chan(reader_po),
145 146 147
        stream_handle_ptr: stream_handle_ptr,
        connect_req: uv::ll::connect_t(),
        write_req: uv::ll::write_t(),
148
        iotask: iotask
J
Jeff Olson 已提交
149
    };
150
    let socket_data_ptr = ptr::addr_of(*socket_data);
J
Jeff Olson 已提交
151 152 153
    log(debug, #fmt("tcp_connect result_ch %?", conn_data.result_ch));
    // get an unsafe representation of our stream_handle_ptr that
    // we can send into the interact cb to be handled in libuv..
154
    log(debug, #fmt("stream_handle_ptr outside interact %?",
155
        stream_handle_ptr));
156
    iotask::interact(iotask) {|loop_ptr|
J
Jeff Olson 已提交
157
        log(debug, "in interact cb for tcp client connect..");
158
        log(debug, #fmt("stream_handle_ptr in interact %?",
J
Jeff Olson 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
            stream_handle_ptr));
        alt uv::ll::tcp_init( loop_ptr, stream_handle_ptr) {
          0i32 {
            log(debug, "tcp_init successful");
            alt input_ip {
              ipv4 {
                log(debug, "dealing w/ ipv4 connection..");
                let tcp_addr = ipv4_ip_addr_to_sockaddr_in(input_ip,
                                                           port);
                let tcp_addr_ptr = ptr::addr_of(tcp_addr);
                let connect_req_ptr =
                    ptr::addr_of((*socket_data_ptr).connect_req);
                alt uv::ll::tcp_connect(
                    connect_req_ptr,
                    stream_handle_ptr,
                    tcp_addr_ptr,
                    tcp_connect_on_connect_cb) {
                  0i32 {
                    log(debug, "tcp_connect successful");
                    // reusable data that we'll have for the
                    // duration..
                    uv::ll::set_data_for_uv_handle(stream_handle_ptr,
181 182
                                               socket_data_ptr as
                                                  *libc::c_void);
J
Jeff Olson 已提交
183 184 185 186 187 188 189 190 191 192 193 194 195
                    // just so the connect_cb can send the
                    // outcome..
                    uv::ll::set_data_for_req(connect_req_ptr,
                                             conn_data_ptr);
                    log(debug, "leaving tcp_connect interact cb...");
                    // let tcp_connect_on_connect_cb send on
                    // the result_ch, now..
                  }
                  _ {
                    // immediate connect failure.. probably a garbage
                    // ip or somesuch
                    let err_data = uv::ll::get_last_err_data(loop_ptr);
                    comm::send((*conn_data_ptr).result_ch,
196
                               conn_failure(err_data.to_tcp_err()));
J
Jeff Olson 已提交
197 198 199 200 201 202 203 204 205 206 207 208
                    uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                                   conn_data_ptr);
                    uv::ll::close(stream_handle_ptr, stream_error_close_cb);
                  }
                }
              }
            }
        }
          _ {
            // failure to create a tcp handle
            let err_data = uv::ll::get_last_err_data(loop_ptr);
            comm::send((*conn_data_ptr).result_ch,
209
                       conn_failure(err_data.to_tcp_err()));
J
Jeff Olson 已提交
210 211 212 213 214 215
          }
        }
    };
    alt comm::recv(result_po) {
      conn_success {
        log(debug, "tcp::connect - received success on result_po");
216
        result::ok(tcp_socket(socket_data))
J
Jeff Olson 已提交
217 218 219 220
      }
      conn_failure(err_data) {
        comm::recv(closed_signal_po);
        log(debug, "tcp::connect - received failure on result_po");
221 222 223 224 225 226 227 228
        // still have to free the malloc'd stream handle..
        rustrt::rust_uv_current_kernel_free(stream_handle_ptr
                                           as *libc::c_void);
        let tcp_conn_err = alt err_data.err_name {
          "ECONNREFUSED" { connection_refused }
          _ { generic_connect_err(err_data.err_name, err_data.err_msg) }
        };
        result::err(tcp_conn_err)
J
Jeff Olson 已提交
229 230 231
      }
    }
}
232 233

#[doc="
234
Write binary data to a tcp stream; Blocks until operation completes
235 236 237 238

# Arguments

* sock - a `tcp_socket` to write to
239
* raw_write_data - a vector of `[u8]/~` that will be written to the stream.
240 241 242 243
This value must remain valid for the duration of the `write` call

# Returns

244 245
A `result` object with a `nil` value as the `ok` variant, or a `tcp_err_data`
value as the `err` variant
246
"]
247
fn write(sock: tcp_socket, raw_write_data: [u8]/~)
248
    -> result::result<(), tcp_err_data> unsafe {
249
    let socket_data_ptr = ptr::addr_of(*(sock.socket_data));
250 251 252 253 254 255 256 257
    write_common_impl(socket_data_ptr, raw_write_data)
}

#[doc="
Write binary data to tcp stream; Returns a `future::future` value immediately

# Safety

258 259 260 261 262 263
This function can produce unsafe results if:

1. the call to `write_future` is made
2. the `future::future` value returned is never resolved via
`future::get`
3. and then the `tcp_socket` passed in to `write_future` leaves
264
scope and is destructed before the task that runs the libuv write
265 266 267 268 269 270 271 272 273
operation completes.

As such: If using `write_future`, always be sure to resolve the returned
`future` so as to ensure libuv doesn't try to access a released write handle.
Otherwise, use the blocking `tcp::write` function instead.

# Arguments

* sock - a `tcp_socket` to write to
274
* raw_write_data - a vector of `[u8]/~` that will be written to the stream.
275 276 277 278 279 280 281 282
This value must remain valid for the duration of the `write` call

# Returns

A `future` value that, once the `write` operation completes, resolves to a
`result` object with a `nil` value as the `ok` variant, or a `tcp_err_data`
value as the `err` variant
"]
283
fn write_future(sock: tcp_socket, raw_write_data: [u8]/~)
284
    -> future<result::result<(), tcp_err_data>> unsafe {
285
    let socket_data_ptr = ptr::addr_of(*(sock.socket_data));
286
    future_spawn {||
287
        let data_copy = copy(raw_write_data);
288
        write_common_impl(socket_data_ptr, data_copy);
289
    }
290 291
}

292
#[doc="
293
Begin reading binary data from an open TCP connection; used with `read_stop`
294 295 296

# Arguments

297
* sock -- a `net::tcp::tcp_socket` for the connection to read from
298 299 300 301 302 303

# Returns

* A `result` instance that will either contain a
`comm::port<tcp_read_result>` that the user can read (and optionally, loop
on) from until `read_stop` is called, or a `tcp_err_data` record
304
"]
305 306
fn read_start(sock: tcp_socket)
    -> result::result<comm::port<
307
        result::result<[u8]/~, tcp_err_data>>, tcp_err_data> unsafe {
308
    let socket_data = ptr::addr_of(*(sock.socket_data));
309
    read_start_common_impl(socket_data)
310
}
311

312
#[doc="
313 314 315 316 317
Stop reading from an open TCP connection; used with `read_start`

# Arguments

* `sock` - a `net::tcp::tcp_socket` that you wish to stop reading on
318
"]
319 320
fn read_stop(sock: tcp_socket,
             -read_port: comm::port<result::result<[u8], tcp_err_data>>) ->
321
    result::result<(), tcp_err_data> unsafe {
322 323
    log(debug, #fmt("taking the read_port out of commission %?", read_port));
    let socket_data = ptr::addr_of(**sock);
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
    read_stop_common_impl(socket_data)
}

#[doc="
Reads a single chunk of data from `tcp_socket`; block until data/error recv'd

Does a blocking read operation for a single chunk of data from a `tcp_socket`
until a data arrives or an error is received. The provided `timeout_msecs`
value is used to raise an error if the timeout period passes without any
data received.

# Arguments

* `sock` - a `net::tcp::tcp_socket` that you wish to read from
* `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
read attempt. Pass `0u` to wait indefinitely
"]
fn read(sock: tcp_socket, timeout_msecs: uint)
342
    -> result::result<[u8]/~,tcp_err_data> {
343
    let socket_data = ptr::addr_of(*(sock.socket_data));
344 345 346 347
    read_common_impl(socket_data, timeout_msecs)
}

#[doc="
348
Reads a single chunk of data; returns a `future::future<[u8]/~>` immediately
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375

Does a non-blocking read operation for a single chunk of data from a
`tcp_socket` and immediately returns a `future` value representing the
result. When resolving the returned `future`, it will block until data
arrives or an error is received. The provided `timeout_msecs`
value is used to raise an error if the timeout period passes without any
data received.

# Safety

This function can produce unsafe results if the call to `read_future` is
made, the `future::future` value returned is never resolved via
`future::get`, and then the `tcp_socket` passed in to `read_future` leaves
scope and is destructed before the task that runs the libuv read
operation completes.

As such: If using `read_future`, always be sure to resolve the returned
`future` so as to ensure libuv doesn't try to access a released read handle.
Otherwise, use the blocking `tcp::read` function instead.

# Arguments

* `sock` - a `net::tcp::tcp_socket` that you wish to read from
* `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
read attempt. Pass `0u` to wait indefinitely
"]
fn read_future(sock: tcp_socket, timeout_msecs: uint)
376
    -> future<result::result<[u8]/~,tcp_err_data>> {
377
    let socket_data = ptr::addr_of(*(sock.socket_data));
378
    future_spawn {||
379
        read_common_impl(socket_data, timeout_msecs)
380
    }
381
}
382

383 384 385 386 387
#[doc="
Bind an incoming client connection to a `net::tcp::tcp_socket`

# Notes

388 389 390
It is safe to call `net::tcp::accept` _only_ within the context of the
`new_connect_cb` callback provided as the final argument to the
`net::tcp::listen` function.
391

392 393 394
The `new_conn` opaque value is provided _only_ as the first argument to the
`new_connect_cb` provided as a part of `net::tcp::listen`.
It can be safely sent to another task but it _must_ be
395
used (via `net::tcp::accept`) before the `new_connect_cb` call it was
396
provided to returns.
397

398
This implies that a port/chan pair must be used to make sure that the
399 400 401
`new_connect_cb` call blocks until an attempt to create a
`net::tcp::tcp_socket` is completed.

402 403 404 405 406 407
# Example

Here, the `new_conn` is used in conjunction with `accept` from within
a task spawned by the `new_connect_cb` passed into `listen`

~~~~~~~~~~~
408 409 410 411 412 413 414 415 416
net::tcp::listen(remote_ip, remote_port, backlog)
    // this callback is ran once after the connection is successfully
    // set up
    {|kill_ch|
      // pass the kill_ch to your main loop or wherever you want
      // to be able to externally kill the server from
    }
    // this callback is ran when a new connection arrives
    {|new_conn, kill_ch|
417 418 419 420
    let cont_po = comm::port::<option<tcp_err_data>>();
    let cont_ch = comm::chan(cont_po);
    task::spawn {||
        let accept_result = net::tcp::accept(new_conn);
421 422 423 424 425
        if accept_result.is_failure() {
            comm::send(cont_ch, result::get_err(accept_result));
            // fail?
        }
        else {
426 427 428 429 430 431 432 433 434 435 436 437 438 439
            let sock = result::get(accept_result);
            comm::send(cont_ch, true);
            // do work here
        }
    };
    alt comm::recv(cont_po) {
      // shut down listen()
      some(err_data) { comm::send(kill_chan, some(err_data)) }
      // wait for next connection
      none {}
    }
};
~~~~~~~~~~~

440 441 442 443 444 445
# Arguments

* `new_conn` - an opaque value used to create a new `tcp_socket`

# Returns

446 447 448 449 450
On success, this function will return a `net::tcp::tcp_socket` as the
`ok` variant of a `result`. The `net::tcp::tcp_socket` is anchored within
the task that `accept` was called within for its lifetime. On failure,
this function will return a `net::tcp::tcp_err_data` record
as the `err` variant of a `result`.
451 452 453 454 455 456 457
"]
fn accept(new_conn: tcp_new_connection)
    -> result::result<tcp_socket, tcp_err_data> unsafe {

    alt new_conn{
      new_tcp_conn(server_handle_ptr) {
        let server_data_ptr = uv::ll::get_data_for_uv_handle(
458
            server_handle_ptr) as *tcp_listen_fc_data;
459
        let reader_po = comm::port::<result::result<[u8]/~, tcp_err_data>>();
460
        let iotask = (*server_data_ptr).iotask;
461 462 463
        let stream_handle_ptr = malloc_uv_tcp_t();
        *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
        let client_socket_data = @{
464 465
            reader_po: reader_po,
            reader_ch: comm::chan(reader_po),
466
            stream_handle_ptr : stream_handle_ptr,
467 468
            connect_req : uv::ll::connect_t(),
            write_req : uv::ll::write_t(),
469
            iotask : iotask
470
        };
471 472 473
        let client_socket_data_ptr = ptr::addr_of(*client_socket_data);
        let client_stream_handle_ptr =
            (*client_socket_data_ptr).stream_handle_ptr;
474 475 476

        let result_po = comm::port::<option<tcp_err_data>>();
        let result_ch = comm::chan(result_po);
477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492

        // UNSAFE LIBUV INTERACTION BEGIN
        // .. normally this happens within the context of
        // a call to uv::hl::interact.. but we're breaking
        // the rules here because this always has to be
        // called within the context of a listen() new_connect_cb
        // callback (or it will likely fail and drown your cat)
        log(debug, "in interact cb for tcp::accept");
        let loop_ptr = uv::ll::get_loop_for_uv_handle(
            server_handle_ptr);
        alt uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
          0i32 {
            log(debug, "uv_tcp_init successful for client stream");
            alt uv::ll::accept(
                server_handle_ptr as *libc::c_void,
                client_stream_handle_ptr as *libc::c_void) {
493
              0i32 {
494 495
                log(debug, "successfully accepted client connection");
                uv::ll::set_data_for_uv_handle(client_stream_handle_ptr,
496 497
                                               client_socket_data_ptr
                                                   as *libc::c_void);
498
                comm::send(result_ch, none);
499 500
              }
              _ {
501
                log(debug, "failed to accept client conn");
502 503 504 505
                comm::send(result_ch, some(
                    uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
              }
            }
506 507 508 509 510 511 512 513
          }
          _ {
            log(debug, "failed to init client stream");
            comm::send(result_ch, some(
                uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
          }
        }
        // UNSAFE LIBUV INTERACTION END
514 515 516 517 518
        alt comm::recv(result_po) {
          some(err_data) {
            result::err(err_data)
          }
          none {
519
            result::ok(tcp_socket(client_socket_data))
520 521 522 523 524 525
          }
        }
      }
    }
}

526 527 528 529 530 531 532 533 534 535
#[doc="
Bind to a given IP/port and listen for new connections

# Arguments

* `host_ip` - a `net::ip::ip_addr` representing a unique IP
(versions 4 or 6)
* `port` - a uint representing the port to listen on
* `backlog` - a uint representing the number of incoming connections
to cache in memory
536
* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on
537 538 539 540 541 542 543 544 545 546 547 548 549
* `on_establish_cb` - a callback that is evaluated if/when the listener
is successfully established. it takes no parameters
* `new_connect_cb` - a callback to be evaluated, on the libuv thread,
whenever a client attempts to conect on the provided ip/port. the
callback's arguments are:
    * `new_conn` - an opaque type that can be passed to
    `net::tcp::accept` in order to be converted to a `tcp_socket`.
    * `kill_ch` - channel of type `comm::chan<option<tcp_err_data>>`. this
    channel can be used to send a message to cause `listen` to begin
    closing the underlying libuv data structures.

# returns

550
a `result` instance containing empty data of type `()` on a
551
successful/normal shutdown, and a `tcp_listen_err_data` enum in the event
552
of listen exiting because of an error
553
"]
554
fn listen(host_ip: ip::ip_addr, port: uint, backlog: uint,
555
          iotask: iotask,
556
          on_establish_cb: fn~(comm::chan<option<tcp_err_data>>),
557 558
          +new_connect_cb: fn~(tcp_new_connection,
                               comm::chan<option<tcp_err_data>>))
559
    -> result::result<(), tcp_listen_err_data> unsafe {
560 561 562 563 564 565 566 567 568 569 570 571 572 573 574
    listen_common(host_ip, port, backlog, iotask, on_establish_cb)
        // on_connect_cb
        {|handle|
            let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
                as *tcp_listen_fc_data;
            let new_conn = new_tcp_conn(handle);
            let kill_ch = (*server_data_ptr).kill_ch;
            new_connect_cb(new_conn, kill_ch);
    }
}

fn listen_common(host_ip: ip::ip_addr, port: uint, backlog: uint,
          iotask: iotask,
          on_establish_cb: fn~(comm::chan<option<tcp_err_data>>),
          -on_connect_cb: fn~(*uv::ll::uv_tcp_t))
575
    -> result::result<(), tcp_listen_err_data> unsafe {
576 577 578 579 580 581 582 583 584
    let stream_closed_po = comm::port::<()>();
    let kill_po = comm::port::<option<tcp_err_data>>();
    let kill_ch = comm::chan(kill_po);
    let server_stream = uv::ll::tcp_t();
    let server_stream_ptr = ptr::addr_of(server_stream);
    let server_data = {
        server_stream_ptr: server_stream_ptr,
        stream_closed_ch: comm::chan(stream_closed_po),
        kill_ch: kill_ch,
585
        on_connect_cb: on_connect_cb,
586
        iotask: iotask,
587 588 589 590
        mut active: true
    };
    let server_data_ptr = ptr::addr_of(server_data);

591 592 593 594 595
    let setup_result = comm::listen {|setup_ch|
        iotask::interact(iotask) {|loop_ptr|
            let tcp_addr = ipv4_ip_addr_to_sockaddr_in(host_ip,
                                                       port);
            alt uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
596
              0i32 {
597 598 599
                uv::ll::set_data_for_uv_handle(
                    server_stream_ptr,
                    server_data_ptr);
600 601
                alt uv::ll::tcp_bind(server_stream_ptr,
                                     ptr::addr_of(tcp_addr)) {
602
                  0i32 {
603 604 605 606 607 608 609 610 611 612 613 614
                    alt uv::ll::listen(server_stream_ptr,
                                       backlog as libc::c_int,
                                       tcp_lfc_on_connection_cb) {
                      0i32 {
                        comm::send(setup_ch, none);
                      }
                      _ {
                        log(debug, "failure to uv_listen()");
                        let err_data = uv::ll::get_last_err_data(loop_ptr);
                        comm::send(setup_ch, some(err_data));
                      }
                    }
615 616
                  }
                  _ {
617
                    log(debug, "failure to uv_tcp_bind");
618 619 620 621 622 623
                    let err_data = uv::ll::get_last_err_data(loop_ptr);
                    comm::send(setup_ch, some(err_data));
                  }
                }
              }
              _ {
624
                log(debug, "failure to uv_tcp_init");
625 626 627 628
                let err_data = uv::ll::get_last_err_data(loop_ptr);
                comm::send(setup_ch, some(err_data));
              }
            }
629 630
        };
        setup_ch.recv()
631
    };
632
    alt setup_result {
633
      some(err_data) {
634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656
        iotask::interact(iotask) {|loop_ptr|
            log(debug, #fmt("tcp::listen post-kill recv hl interact %?",
                            loop_ptr));
            (*server_data_ptr).active = false;
            uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
        };
        stream_closed_po.recv();
        alt err_data.err_name {
          "EACCES" {
            log(debug, "Got EACCES error");
            result::err(access_denied)
          }
          "EADDRINUSE" {
            log(debug, "Got EADDRINUSE error");
            result::err(address_in_use)
          }
          _ {
            log(debug, #fmt("Got '%s' '%s' libuv error",
                            err_data.err_name, err_data.err_msg));
            result::err(
                generic_listen_err(err_data.err_name, err_data.err_msg))
          }
        }
657 658 659
      }
      none {
        on_establish_cb(kill_ch);
N
Niko Matsakis 已提交
660
        let kill_result = comm::recv(kill_po);
661
        iotask::interact(iotask) {|loop_ptr|
662 663 664 665 666
            log(debug, #fmt("tcp::listen post-kill recv hl interact %?",
                            loop_ptr));
            (*server_data_ptr).active = false;
            uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
        };
667
        stream_closed_po.recv();
668 669 670
        alt kill_result {
          // some failure post bind/listen
          some(err_data) {
671 672
            result::err(generic_listen_err(err_data.err_name,
                                            err_data.err_msg))
673 674 675 676 677 678 679 680 681
          }
          // clean exit
          none {
            result::ok(())
          }
        }
      }
    }
}
682

683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701
#[doc="
Convert a `net::tcp::tcp_socket` to a `net::tcp::tcp_socket_buf`.

This function takes ownership of a `net::tcp::tcp_socket`, returning it
stored within a buffered wrapper, which can be converted to a `io::reader`
or `io::writer`

# Arguments

* `sock` -- a `net::tcp::tcp_socket` that you want to buffer

# Returns

A buffered wrapper that you can cast as an `io::reader` or `io::writer`
"]
fn socket_buf(-sock: tcp_socket) -> tcp_socket_buf {
    tcp_socket_buf(@{ sock: sock, mut buf: [] })
}

702 703 704
#[doc="
Convenience methods extending `net::tcp::tcp_socket`
"]
705
impl tcp_socket for tcp_socket {
706 707 708 709
    fn read_start() -> result::result<comm::port<
        result::result<[u8]/~, tcp_err_data>>, tcp_err_data> {
        read_start(self)
    }
710 711
    fn read_stop(-read_port:
                 comm::port<result::result<[u8], tcp_err_data>>) ->
712
        result::result<(), tcp_err_data> {
713
        read_stop(self, read_port)
714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729
    }
    fn read(timeout_msecs: uint) ->
        result::result<[u8]/~, tcp_err_data> {
        read(self, timeout_msecs)
    }
    fn read_future(timeout_msecs: uint) ->
        future::future<result::result<[u8]/~, tcp_err_data>> {
        read_future(self, timeout_msecs)
    }
    fn write(raw_write_data: [u8]/~)
        -> result::result<(), tcp_err_data> {
        write(self, raw_write_data)
    }
    fn write_future(raw_write_data: [u8]/~)
        -> future::future<result::result<(), tcp_err_data>> {
        write_future(self, raw_write_data)
730
    }
731
}
732 733 734 735

#[doc="
Implementation of `io::reader` iface for a buffered `net::tcp::tcp_socket`
"]
736
impl tcp_socket_buf of io::reader for @tcp_socket_buf {
737 738 739 740 741 742 743 744 745 746 747 748 749 750
    fn read_bytes(amt: uint) -> [u8] {
        let has_amt_available =
            vec::len((*self).buf) >= amt;
        if has_amt_available {
            // no arbitrary-length shift in vec::?
            let mut ret_buf = [];
            while vec::len(ret_buf) < amt {
                ret_buf += [vec::shift((*self).buf)];
            }
            ret_buf
        }
        else {
            let read_result = read((*self).sock, 0u);
            if read_result.is_failure() {
751 752 753
                let err_data = read_result.get_err();
                log(debug, #fmt("ERROR sock_buf as io::reader.read err %? %?",
                                 err_data.err_name, err_data.err_msg));
754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783
                []
            }
            else {
                let new_chunk = result::unwrap(read_result);
                (*self).buf += new_chunk;
                self.read_bytes(amt)
            }
        }
    }
    fn read_byte() -> int {
        self.read_bytes(1u)[0] as int
    }
    fn unread_byte(amt: int) {
        vec::unshift((*self).buf, amt as u8);
    }
    fn eof() -> bool {
        false // noop
    }
    fn seek(dist: int, seek: io::seek_style) {
        log(debug, #fmt("tcp_socket_buf seek stub %? %?", dist, seek));
        // noop
    }
    fn tell() -> uint {
        0u // noop
    }
}

#[doc="
Implementation of `io::reader` iface for a buffered `net::tcp::tcp_socket`
"]
784
impl tcp_socket_buf of io::writer for @tcp_socket_buf {
785 786 787 788 789 790
    fn write(data: [const u8]/&) unsafe {
        let socket_data_ptr = ptr::addr_of(**((*self).sock));
        let write_buf_vec = vec::unpack_const_slice(data) {|ptr, len|
            [ uv::ll::buf_init(ptr as *u8, len) ]
        };
        let write_buf_vec_ptr = ptr::addr_of(write_buf_vec);
791 792 793 794 795 796
        let w_result = write_common_impl(socket_data_ptr, write_buf_vec_ptr);
        if w_result.is_failure() {
            let err_data = w_result.get_err();
            log(debug, #fmt("ERROR sock_buf as io::writer.writer err: %? %?",
                             err_data.err_name, err_data.err_msg));
        }
797 798 799 800 801 802 803 804 805 806 807 808 809
    }
    fn seek(dist: int, seek: io::seek_style) {
      log(debug, #fmt("tcp_socket_buf seek stub %? %?", dist, seek));
        // noop
    }
    fn tell() -> uint {
        0u
    }
    fn flush() -> int {
        0
    }
}

810 811
// INTERNAL API

812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833
fn tear_down_socket_data(socket_data: @tcp_socket_data) unsafe {
    let closed_po = comm::port::<()>();
    let closed_ch = comm::chan(closed_po);
    let close_data = {
        closed_ch: closed_ch
    };
    let close_data_ptr = ptr::addr_of(close_data);
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
    iotask::interact((*socket_data).iotask) {|loop_ptr|
        log(debug, #fmt("interact dtor for tcp_socket stream %? loop %?",
            stream_handle_ptr, loop_ptr));
        uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                       close_data_ptr);
        uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb);
    };
    comm::recv(closed_po);
    log(debug, #fmt("about to free socket_data at %?", socket_data));
    rustrt::rust_uv_current_kernel_free(stream_handle_ptr
                                       as *libc::c_void);
    log(debug, "exiting dtor for tcp_socket");
}

834 835
// shared implementation for tcp::read
fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint)
836
    -> result::result<[u8]/~,tcp_err_data> unsafe {
837
    log(debug, "starting tcp::read");
838
    let iotask = (*socket_data).iotask;
839
    let rs_result = read_start_common_impl(socket_data);
840
    if result::is_err(rs_result) {
841 842 843 844 845 846 847
        let err_data = result::get_err(rs_result);
        result::err(err_data)
    }
    else {
        log(debug, "tcp::read before recv_timeout");
        let read_result = if timeout_msecs > 0u {
            timer::recv_timeout(
848
               iotask, timeout_msecs, result::get(rs_result))
849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877
        } else {
            some(comm::recv(result::get(rs_result)))
        };
        log(debug, "tcp::read after recv_timeout");
        alt read_result {
          none {
            log(debug, "tcp::read: timed out..");
            let err_data = {
                err_name: "TIMEOUT",
                err_msg: "req timed out"
            };
            read_stop_common_impl(socket_data);
            result::err(err_data)
          }
          some(data_result) {
            log(debug, "tcp::read got data");
            read_stop_common_impl(socket_data);
            data_result
          }
        }
    }
}

// shared impl for read_stop
fn read_stop_common_impl(socket_data: *tcp_socket_data) ->
    result::result<(), tcp_err_data> unsafe {
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
    let stop_po = comm::port::<option<tcp_err_data>>();
    let stop_ch = comm::chan(stop_po);
878
    iotask::interact((*socket_data).iotask) {|loop_ptr|
879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904
        log(debug, "in interact cb for tcp::read_stop");
        alt uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
          0i32 {
            log(debug, "successfully called uv_read_stop");
            comm::send(stop_ch, none);
          }
          _ {
            log(debug, "failure in calling uv_read_stop");
            let err_data = uv::ll::get_last_err_data(loop_ptr);
            comm::send(stop_ch, some(err_data.to_tcp_err()));
          }
        }
    };
    alt comm::recv(stop_po) {
      some(err_data) {
        result::err(err_data.to_tcp_err())
      }
      none {
        result::ok(())
      }
    }
}

// shared impl for read_start
fn read_start_common_impl(socket_data: *tcp_socket_data)
    -> result::result<comm::port<
905
        result::result<[u8]/~, tcp_err_data>>, tcp_err_data> unsafe {
906 907 908 909
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
    let start_po = comm::port::<option<uv::ll::uv_err_data>>();
    let start_ch = comm::chan(start_po);
    log(debug, "in tcp::read_start before interact loop");
910
    iotask::interact((*socket_data).iotask) {|loop_ptr|
911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935
        log(debug, #fmt("in tcp::read_start interact cb %?", loop_ptr));
        alt uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
                               on_alloc_cb,
                               on_tcp_read_cb) {
          0i32 {
            log(debug, "success doing uv_read_start");
            comm::send(start_ch, none);
          }
          _ {
            log(debug, "error attempting uv_read_start");
            let err_data = uv::ll::get_last_err_data(loop_ptr);
            comm::send(start_ch, some(err_data));
          }
        }
    };
    alt comm::recv(start_po) {
      some(err_data) {
        result::err(err_data.to_tcp_err())
      }
      none {
        result::ok((*socket_data).reader_po)
      }
    }
}

936 937
// helper to convert a "class" vector of [u8] to a *[uv::ll::uv_buf_t]

938 939
// shared implementation used by write and write_future
fn write_common_impl(socket_data_ptr: *tcp_socket_data,
940
                     raw_write_data: [u8]/~)
941 942 943 944 945 946
    -> result::result<(), tcp_err_data> unsafe {
    let write_req_ptr = ptr::addr_of((*socket_data_ptr).write_req);
    let stream_handle_ptr =
        (*socket_data_ptr).stream_handle_ptr;
    let write_buf_vec =  [ uv::ll::buf_init(
        vec::unsafe::to_ptr(raw_write_data),
947
        vec::len(raw_write_data)) ]/~;
948 949 950 951 952 953
    let write_buf_vec_ptr = ptr::addr_of(write_buf_vec);
    let result_po = comm::port::<tcp_write_result>();
    let write_data = {
        result_ch: comm::chan(result_po)
    };
    let write_data_ptr = ptr::addr_of(write_data);
954
    iotask::interact((*socket_data_ptr).iotask) {|loop_ptr|
955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971
        log(debug, #fmt("in interact cb for tcp::write %?", loop_ptr));
        alt uv::ll::write(write_req_ptr,
                          stream_handle_ptr,
                          write_buf_vec_ptr,
                          tcp_write_complete_cb) {
          0i32 {
            log(debug, "uv_write() invoked successfully");
            uv::ll::set_data_for_req(write_req_ptr, write_data_ptr);
          }
          _ {
            log(debug, "error invoking uv_write()");
            let err_data = uv::ll::get_last_err_data(loop_ptr);
            comm::send((*write_data_ptr).result_ch,
                       tcp_write_error(err_data.to_tcp_err()));
          }
        }
    };
972 973 974 975
    // FIXME (#2656): Instead of passing unsafe pointers to local data,
    // and waiting here for the write to complete, we should transfer
    // ownership of everything to the I/O task and let it deal with the
    // aftermath, so we don't have to sit here blocking.
976 977 978 979 980 981
    alt comm::recv(result_po) {
      tcp_write_success { result::ok(()) }
      tcp_write_error(err_data) { result::err(err_data.to_tcp_err()) }
    }
}

982 983 984 985
enum tcp_new_connection {
    new_tcp_conn(*uv::ll::uv_tcp_t)
}

986
type tcp_listen_fc_data = {
987 988 989
    server_stream_ptr: *uv::ll::uv_tcp_t,
    stream_closed_ch: comm::chan<()>,
    kill_ch: comm::chan<option<tcp_err_data>>,
990
    on_connect_cb: fn~(*uv::ll::uv_tcp_t),
991
    iotask: iotask,
992 993 994
    mut active: bool
};

995
crust fn tcp_lfc_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
996
    let server_data_ptr = uv::ll::get_data_for_uv_handle(
997
        handle) as *tcp_listen_fc_data;
998 999 1000
    comm::send((*server_data_ptr).stream_closed_ch, ());
}

1001
crust fn tcp_lfc_on_connection_cb(handle: *uv::ll::uv_tcp_t,
1002 1003
                                     status: libc::c_int) unsafe {
    let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
1004
        as *tcp_listen_fc_data;
1005
    let kill_ch = (*server_data_ptr).kill_ch;
1006
    if (*server_data_ptr).active {
1007 1008
        alt status {
          0i32 {
1009
            (*server_data_ptr).on_connect_cb(handle);
1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021
          }
          _ {
            let loop_ptr = uv::ll::get_loop_for_uv_handle(handle);
            comm::send(kill_ch,
                       some(uv::ll::get_last_err_data(loop_ptr)
                            .to_tcp_err()));
            (*server_data_ptr).active = false;
          }
        }
    }
}

1022
fn malloc_uv_tcp_t() -> *uv::ll::uv_tcp_t unsafe {
1023
    rustrt::rust_uv_current_kernel_malloc(
1024
        rustrt::rust_uv_helper_uv_tcp_t_size()) as *uv::ll::uv_tcp_t
1025 1026
}

1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042
enum tcp_connect_result {
    tcp_connected(tcp_socket),
    tcp_connect_error(tcp_err_data)
}

enum tcp_write_result {
    tcp_write_success,
    tcp_write_error(tcp_err_data)
}

enum tcp_read_start_result {
    tcp_read_start_success(comm::port<tcp_read_result>),
    tcp_read_start_error(tcp_err_data)
}

enum tcp_read_result {
1043
    tcp_read_data([u8]/~),
1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057
    tcp_read_done,
    tcp_read_err(tcp_err_data)
}

iface to_tcp_err_iface {
    fn to_tcp_err() -> tcp_err_data;
}

impl of to_tcp_err_iface for uv::ll::uv_err_data {
    fn to_tcp_err() -> tcp_err_data {
        { err_name: self.err_name, err_msg: self.err_msg }
    }
}

1058 1059 1060
crust fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
                    nread: libc::ssize_t,
                    ++buf: uv::ll::uv_buf_t) unsafe {
1061 1062
    log(debug, #fmt("entering on_tcp_read_cb stream: %? nread: %?",
                    stream, nread));
1063 1064 1065
    let loop_ptr = uv::ll::get_loop_for_uv_handle(stream);
    let socket_data_ptr = uv::ll::get_data_for_uv_handle(stream)
        as *tcp_socket_data;
1066
    alt nread as int {
1067 1068
      // incoming err.. probably eof
      -1 {
1069 1070 1071 1072 1073
        let err_data = uv::ll::get_last_err_data(loop_ptr).to_tcp_err();
        log(debug, #fmt("on_tcp_read_cb: incoming err.. name %? msg %?",
                        err_data.err_name, err_data.err_msg));
        let reader_ch = (*socket_data_ptr).reader_ch;
        comm::send(reader_ch, result::err(err_data));
1074 1075 1076 1077 1078 1079
      }
      // do nothing .. unneeded buf
      0 {}
      // have data
      _ {
        // we have data
1080
        log(debug, #fmt("tcp on_read_cb nread: %d", nread as int));
1081
        let reader_ch = (*socket_data_ptr).reader_ch;
1082 1083
        let buf_base = uv::ll::get_base_from_buf(buf);
        let buf_len = uv::ll::get_len_from_buf(buf);
1084
        let new_bytes = vec::unsafe::from_buf(buf_base, buf_len as uint);
1085
        comm::send(reader_ch, result::ok(new_bytes));
1086 1087 1088
      }
    }
    uv::ll::free_base_of_buf(buf);
1089
    log(debug, "exiting on_tcp_read_cb");
1090 1091 1092
}

crust fn on_alloc_cb(handle: *libc::c_void,
1093
                     ++suggested_size: size_t)
1094 1095 1096 1097 1098 1099 1100
    -> uv::ll::uv_buf_t unsafe {
    log(debug, "tcp read on_alloc_cb!");
    let char_ptr = uv::ll::malloc_buf_base_of(suggested_size);
    log(debug, #fmt("tcp read on_alloc_cb h: %? char_ptr: %u sugsize: %u",
                     handle,
                     char_ptr as uint,
                     suggested_size as uint));
1101
    uv::ll::buf_init(char_ptr, suggested_size as uint)
1102
}
1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119

type tcp_socket_close_data = {
    closed_ch: comm::chan<()>
};

crust fn tcp_socket_dtor_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
    let data = uv::ll::get_data_for_uv_handle(handle)
        as *tcp_socket_close_data;
    let closed_ch = (*data).closed_ch;
    comm::send(closed_ch, ());
    log(debug, "tcp_socket_dtor_close_cb exiting..");
}

crust fn tcp_write_complete_cb(write_req: *uv::ll::uv_write_t,
                              status: libc::c_int) unsafe {
    let write_data_ptr = uv::ll::get_data_for_req(write_req)
        as *write_req_data;
1120
    if status == 0i32 {
1121 1122
        log(debug, "successful write complete");
        comm::send((*write_data_ptr).result_ch, tcp_write_success);
1123
    } else {
1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136
        let stream_handle_ptr = uv::ll::get_stream_handle_from_write_req(
            write_req);
        let loop_ptr = uv::ll::get_loop_for_uv_handle(stream_handle_ptr);
        let err_data = uv::ll::get_last_err_data(loop_ptr);
        log(debug, "failure to write");
        comm::send((*write_data_ptr).result_ch, tcp_write_error(err_data));
    }
}

type write_req_data = {
    result_ch: comm::chan<tcp_write_result>
};

J
Jeff Olson 已提交
1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186
type connect_req_data = {
    result_ch: comm::chan<conn_attempt>,
    closed_signal_ch: comm::chan<()>
};

crust fn stream_error_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
    let data = uv::ll::get_data_for_uv_handle(handle) as
        *connect_req_data;
    comm::send((*data).closed_signal_ch, ());
    log(debug, #fmt("exiting steam_error_close_cb for %?", handle));
}

crust fn tcp_connect_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
    log(debug, #fmt("closed client tcp handle %?", handle));
}

crust fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t,
                                   status: libc::c_int) unsafe {
    let conn_data_ptr = (uv::ll::get_data_for_req(connect_req_ptr)
                      as *connect_req_data);
    let result_ch = (*conn_data_ptr).result_ch;
    log(debug, #fmt("tcp_connect result_ch %?", result_ch));
    let tcp_stream_ptr =
        uv::ll::get_stream_handle_from_connect_req(connect_req_ptr);
    alt status {
      0i32 {
        log(debug, "successful tcp connection!");
        comm::send(result_ch, conn_success);
      }
      _ {
        log(debug, "error in tcp_connect_on_connect_cb");
        let loop_ptr = uv::ll::get_loop_for_uv_handle(tcp_stream_ptr);
        let err_data = uv::ll::get_last_err_data(loop_ptr);
        log(debug, #fmt("err_data %? %?", err_data.err_name,
                        err_data.err_msg));
        comm::send(result_ch, conn_failure(err_data));
        uv::ll::set_data_for_uv_handle(tcp_stream_ptr,
                                       conn_data_ptr);
        uv::ll::close(tcp_stream_ptr, stream_error_close_cb);
      }
    }
    log(debug, "leaving tcp_connect_on_connect_cb");
}

enum conn_attempt {
    conn_success,
    conn_failure(uv::ll::uv_err_data)
}

type tcp_socket_data = {
1187 1188
    reader_po: comm::port<result::result<[u8]/~, tcp_err_data>>,
    reader_ch: comm::chan<result::result<[u8]/~, tcp_err_data>>,
1189
    stream_handle_ptr: *uv::ll::uv_tcp_t,
J
Jeff Olson 已提交
1190
    connect_req: uv::ll::uv_connect_t,
1191
    write_req: uv::ll::uv_write_t,
1192
    iotask: iotask
J
Jeff Olson 已提交
1193 1194
};

1195 1196 1197 1198 1199
type tcp_buffered_socket_data = {
    sock: tcp_socket,
    mut buf: [u8]
};

J
Jeff Olson 已提交
1200
// convert rust ip_addr to libuv's native representation
1201
fn ipv4_ip_addr_to_sockaddr_in(input_ip: ip::ip_addr,
J
Jeff Olson 已提交
1202
                               port: uint) -> uv::ll::sockaddr_in unsafe {
1203
    // FIXME (#2656): ipv6
1204 1205 1206 1207
    alt input_ip {
      ip::ipv4(_,_,_,_) {
        uv::ll::ip4_addr(ip::format_addr(input_ip), port as int)
      }
1208
      ip::ipv6(_,_,_,_,_,_,_,_) {
1209
        fail "FIXME (#2656) ipv6 not yet supported";
1210 1211
      }
    }
J
Jeff Olson 已提交
1212 1213
}

1214
//#[cfg(test)]
1215
mod test {
1216
    // FIXME don't run on fbsd or linux 32 bit (#2064)
1217 1218 1219 1220 1221 1222 1223 1224 1225 1226
    #[cfg(target_os="win32")]
    #[cfg(target_os="darwin")]
    #[cfg(target_os="linux")]
    mod tcp_ipv4_server_and_client_test {
        #[cfg(target_arch="x86_64")]
        mod impl64 {
            #[test]
            fn test_gl_tcp_server_and_client_ipv4() unsafe {
                impl_gl_tcp_ipv4_server_and_client();
            }
1227
            #[test]
1228 1229 1230
            fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
                impl_gl_tcp_ipv4_client_error_connection_refused();
            }
1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241
            #[test]
            fn test_gl_tcp_server_address_in_use() unsafe {
                impl_gl_tcp_ipv4_server_address_in_use();
            }
            #[test]
            // FIXME: this probably needs to be ignored on windows.
            // ... need to verify (someday we'll have 64bit windows! :)
            //#[ignore(cfg(target_os = "win32"))]
            fn test_gl_tcp_server_access_denied() unsafe {
                impl_gl_tcp_ipv4_server_access_denied();
            }
1242 1243 1244 1245
            #[test]
            fn test_gl_tcp_ipv4_server_client_reader_writer() {
                impl_gl_tcp_ipv4_server_client_reader_writer();
            }
1246

1247 1248 1249 1250 1251 1252 1253 1254
        }
        #[cfg(target_arch="x86")]
        mod impl32 {
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_server_and_client_ipv4() unsafe {
                impl_gl_tcp_ipv4_server_and_client();
            }
1255 1256
            #[test]
            #[ignore(cfg(target_os = "linux"))]
1257 1258 1259
            fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
                impl_gl_tcp_ipv4_client_error_connection_refused();
            }
1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_server_address_in_use() unsafe {
                impl_gl_tcp_ipv4_server_address_in_use();
            }
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            // FIXME: this probably needs to be ignored on windows.
            // ... need to verify
            //#[ignore(cfg(target_os = "win32"))]
            fn test_gl_tcp_server_access_denied() unsafe {
                impl_gl_tcp_ipv4_server_access_denied();
            }
1273 1274 1275 1276 1277
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_ipv4_server_client_reader_writer() {
                impl_gl_tcp_ipv4_server_client_reader_writer();
            }
1278
        }
1279
    }
1280
    fn impl_gl_tcp_ipv4_server_and_client() {
1281
        let hl_loop = uv::global_loop::get();
1282 1283
        let server_ip = "127.0.0.1";
        let server_port = 8888u;
1284 1285 1286 1287 1288
        let expected_req = "ping";
        let expected_resp = "pong";

        let server_result_po = comm::port::<str>();
        let server_result_ch = comm::chan(server_result_po);
1289 1290 1291

        let cont_po = comm::port::<()>();
        let cont_ch = comm::chan(cont_po);
1292 1293 1294 1295 1296 1297 1298
        // server
        task::spawn_sched(task::manual_threads(1u)) {||
            let actual_req = comm::listen {|server_ch|
                run_tcp_test_server(
                    server_ip,
                    server_port,
                    expected_resp,
1299
                    server_ch,
1300 1301
                    cont_ch,
                    hl_loop)
1302 1303 1304
            };
            server_result_ch.send(actual_req);
        };
1305
        comm::recv(cont_po);
1306 1307
        // client
        log(debug, "server started, firing up client..");
1308
        let actual_resp_result = comm::listen {|client_ch|
1309 1310 1311 1312
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
1313 1314
                client_ch,
                hl_loop)
1315
        };
1316 1317
        assert actual_resp_result.is_success();
        let actual_resp = actual_resp_result.get();
1318 1319 1320 1321 1322 1323 1324
        let actual_req = comm::recv(server_result_po);
        log(debug, #fmt("REQ: expected: '%s' actual: '%s'",
                       expected_req, actual_req));
        log(debug, #fmt("RESP: expected: '%s' actual: '%s'",
                       expected_resp, actual_resp));
        assert str::contains(actual_req, expected_req);
        assert str::contains(actual_resp, expected_resp);
1325
    }
1326 1327 1328
    fn impl_gl_tcp_ipv4_client_error_connection_refused() {
        let hl_loop = uv::global_loop::get();
        let server_ip = "127.0.0.1";
1329
        let server_port = 8889u;
1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348
        let expected_req = "ping";
        // client
        log(debug, "firing up client..");
        let actual_resp_result = comm::listen {|client_ch|
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
                client_ch,
                hl_loop)
        };
        alt actual_resp_result.get_err() {
          connection_refused {
          }
          _ {
            fail "unknown error.. expected connection_refused"
          }
        }
    }
1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418
    fn impl_gl_tcp_ipv4_server_address_in_use() {
        let hl_loop = uv::global_loop::get();
        let server_ip = "127.0.0.1";
        let server_port = 8890u;
        let expected_req = "ping";
        let expected_resp = "pong";

        let server_result_po = comm::port::<str>();
        let server_result_ch = comm::chan(server_result_po);

        let cont_po = comm::port::<()>();
        let cont_ch = comm::chan(cont_po);
        // server
        task::spawn_sched(task::manual_threads(1u)) {||
            let actual_req = comm::listen {|server_ch|
                run_tcp_test_server(
                    server_ip,
                    server_port,
                    expected_resp,
                    server_ch,
                    cont_ch,
                    hl_loop)
            };
            server_result_ch.send(actual_req);
        };
        comm::recv(cont_po);
        // this one should fail..
        let listen_err = run_tcp_test_server_fail(
                            server_ip,
                            server_port,
                            hl_loop);
        // client.. just doing this so that the first server tears down
        log(debug, "server started, firing up client..");
        comm::listen {|client_ch|
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
                client_ch,
                hl_loop)
        };
        alt listen_err {
          address_in_use {
            assert true;
          }
          _ {
            fail "expected address_in_use listen error,"+
                      "but got a different error varient. check logs.";
          }
        }
    }
    fn impl_gl_tcp_ipv4_server_access_denied() {
        let hl_loop = uv::global_loop::get();
        let server_ip = "127.0.0.1";
        let server_port = 80u;
        // this one should fail..
        let listen_err = run_tcp_test_server_fail(
                            server_ip,
                            server_port,
                            hl_loop);
        alt listen_err {
          access_denied {
            assert true;
          }
          _ {
            fail "expected address_in_use listen error,"+
                      "but got a different error varient. check logs.";
          }
        }
    }
1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485
    fn impl_gl_tcp_ipv4_server_client_reader_writer() {
        let iotask = uv::global_loop::get();
        let server_ip = "127.0.0.1";
        let server_port = 8891u;
        let expected_req = "ping";
        let expected_resp = "pong";

        let server_result_po = comm::port::<str>();
        let server_result_ch = comm::chan(server_result_po);

        let cont_po = comm::port::<()>();
        let cont_ch = comm::chan(cont_po);
        // server
        task::spawn_sched(task::manual_threads(1u)) {||
            let actual_req = comm::listen {|server_ch|
                run_tcp_test_server(
                    server_ip,
                    server_port,
                    expected_resp,
                    server_ch,
                    cont_ch,
                    iotask)
            };
            server_result_ch.send(actual_req);
        };
        comm::recv(cont_po);
        // client
        let server_addr = ip::v4::parse_addr(server_ip);
        let conn_result = connect(server_addr, server_port, iotask);
        if result::is_failure(conn_result) {
            assert false;
        }
        let sock_buf = @socket_buf(result::unwrap(conn_result));
        buf_write(sock_buf as io::writer, expected_req);

        // so contrived!
        let actual_resp = str::as_bytes(expected_resp) {|resp_buf|
            buf_read(sock_buf as io::reader,
                     vec::len(resp_buf))
        };
        
        let actual_req = comm::recv(server_result_po);
        log(debug, #fmt("REQ: expected: '%s' actual: '%s'",
                       expected_req, actual_req));
        log(debug, #fmt("RESP: expected: '%s' actual: '%s'",
                       expected_resp, actual_resp));
        assert str::contains(actual_req, expected_req);
        assert str::contains(actual_resp, expected_resp);
    }

    fn buf_write(+w: io::writer, val: str) {
        log(debug, #fmt("BUF_WRITE: val len %?", str::len(val)));
        str::as_slice(val, 0u, str::len(val) -1u) {|val_slice|
            str::byte_slice(val_slice) {|b_slice|
                log(debug, #fmt("BUF_WRITE: b_slice len %?",
                                vec::len(b_slice)));
                w.write(b_slice)
            }
        }
    }

    fn buf_read(+r: io::reader, len: uint) -> str {
        let new_bytes = r.read_bytes(len);
        log(debug, #fmt("in buf_read.. new_bytes len: %?",
                        vec::len(new_bytes)));
        str::from_bytes(new_bytes)
    }
1486

1487
    fn run_tcp_test_server(server_ip: str, server_port: uint, resp: str,
1488
                          server_ch: comm::chan<str>,
1489
                          cont_ch: comm::chan<()>,
1490
                          iotask: iotask) -> str {
1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526
        let server_ip_addr = ip::v4::parse_addr(server_ip);
        let listen_result = listen(server_ip_addr, server_port, 128u, iotask,
            // on_establish_cb -- called when listener is set up
            {|kill_ch|
                log(debug, #fmt("establish_cb %?",
                    kill_ch));
                comm::send(cont_ch, ());
            },
            // risky to run this on the loop, but some users
            // will want the POWER
            {|new_conn, kill_ch|
            log(debug, "SERVER: new connection!");
            comm::listen {|cont_ch|
                task::spawn_sched(task::manual_threads(1u)) {||
                    log(debug, "SERVER: starting worker for new req");

                    let accept_result = accept(new_conn);
                    log(debug, "SERVER: after accept()");
                    if result::is_err(accept_result) {
                        log(debug, "SERVER: error accept connection");
                        let err_data = result::get_err(accept_result);
                        comm::send(kill_ch, some(err_data));
                        log(debug,
                            "SERVER/WORKER: send on err cont ch");
                        cont_ch.send(());
                    }
                    else {
                        log(debug,
                            "SERVER/WORKER: send on cont ch");
                        cont_ch.send(());
                        let sock = result::unwrap(accept_result);
                        log(debug, "SERVER: successfully accepted"+
                            "connection!");
                        let received_req_bytes = read(sock, 0u);
                        alt received_req_bytes {
                          result::ok(data) {
1527 1528 1529
                            log(debug, "SERVER: got REQ str::from_bytes..");
                            log(debug, #fmt("SERVER: REQ data len: %?",
                                            vec::len(data)));
1530 1531 1532 1533 1534 1535 1536 1537 1538 1539
                            server_ch.send(
                                str::from_bytes(data));
                            log(debug, "SERVER: before write");
                            tcp_write_single(sock, str::bytes(resp));
                            log(debug, "SERVER: after write.. die");
                            comm::send(kill_ch, none);
                          }
                          result::err(err_data) {
                            log(debug, #fmt("SERVER: error recvd: %s %s",
                                err_data.err_name, err_data.err_msg));
1540
                            comm::send(kill_ch, some(err_data));
1541 1542
                            server_ch.send("");
                          }
1543
                        }
1544
                        log(debug, "SERVER: worker spinning down");
1545
                    }
1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564
                }
                log(debug, "SERVER: waiting to recv on cont_ch");
                cont_ch.recv()
            };
            log(debug, "SERVER: recv'd on cont_ch..leaving listen cb");
        });
        // err check on listen_result
        if result::is_err(listen_result) {
            alt result::get_err(listen_result) {
              generic_listen_err(name, msg) {
                fail #fmt("SERVER: exited abnormally name %s msg %s",
                                name, msg);
              }
              access_denied {
                fail "SERVER: exited abnormally, got access denied..";
              }
              address_in_use {
                fail "SERVER: exited abnormally, got address in use...";
              }
1565
            }
1566
        }
1567 1568 1569
        let ret_val = server_ch.recv();
        log(debug, #fmt("SERVER: exited and got ret val: '%s'", ret_val));
        ret_val
1570
    }
1571

1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593
    fn run_tcp_test_server_fail(server_ip: str, server_port: uint,
                          iotask: iotask) -> tcp_listen_err_data {
        let server_ip_addr = ip::v4::parse_addr(server_ip);
        let listen_result = listen(server_ip_addr, server_port, 128u, iotask,
            // on_establish_cb -- called when listener is set up
            {|kill_ch|
                log(debug, #fmt("establish_cb %?",
                    kill_ch));
            },
            {|new_conn, kill_ch|
                fail #fmt("SERVER: shouldn't be called.. %? %?",
                           new_conn, kill_ch);
        });
        // err check on listen_result
        if result::is_failure(listen_result) {
            result::get_err(listen_result)
        }
        else {
            fail "SERVER: did not fail as expected"
        }
    }

1594
    fn run_tcp_test_client(server_ip: str, server_port: uint, resp: str,
1595
                          client_ch: comm::chan<str>,
1596 1597
                          iotask: iotask) -> result::result<str,
                                                    tcp_connect_err_data> {
1598
        let server_ip_addr = ip::v4::parse_addr(server_ip);
1599

1600
        log(debug, "CLIENT: starting..");
1601
        let connect_result = connect(server_ip_addr, server_port, iotask);
1602
        if result::is_err(connect_result) {
1603
            log(debug, "CLIENT: failed to connect");
1604
            let err_data = result::get_err(connect_result);
1605
            err(err_data)
1606
        }
1607 1608 1609 1610
        else {
            let sock = result::unwrap(connect_result);
            let resp_bytes = str::bytes(resp);
            tcp_write_single(sock, resp_bytes);
1611
            let read_result = sock.read(0u);
1612
            if read_result.is_err() {
1613
                log(debug, "CLIENT: failure to read");
1614
                ok("")
1615
            }
1616 1617 1618 1619 1620
            else {
                client_ch.send(str::from_bytes(read_result.get()));
                let ret_val = client_ch.recv();
                log(debug, #fmt("CLIENT: after client_ch recv ret: '%s'",
                   ret_val));
1621
                ok(ret_val)
1622 1623
            }
        }
1624 1625
    }

1626
    fn tcp_write_single(sock: tcp_socket, val: [u8]/~) {
1627 1628
        let write_result_future = sock.write_future(val);
        let write_result = write_result_future.get();
1629
        if result::is_err(write_result) {
1630
            log(debug, "tcp_write_single: write failed!");
1631
            let err_data = result::get_err(write_result);
1632 1633 1634 1635
            log(debug, #fmt("tcp_write_single err name: %s msg: %s",
                err_data.err_name, err_data.err_msg));
            // meh. torn on what to do here.
            fail "tcp_write_single failed";
1636
        }
1637
    }
1638
}