net_tcp.rs 58.3 KB
Newer Older
1 2 3 4
#[doc="
High-level interface to libuv's TCP functionality
"];

J
Jeff Olson 已提交
5
import ip = net_ip;
6 7
import uv::iotask;
import uv::iotask::iotask;
8
import comm::methods;
9
import future_spawn = future::spawn;
10
import future::extensions;
J
Jeff Olson 已提交
11 12
// FIXME #1935
// should be able to, but can't atm, replace w/ result::{result, extensions};
13 14 15
import result::*;
import libc::size_t;
import str::extensions;
J
Jeff Olson 已提交
16

17
// tcp interfaces
18
export tcp_socket;
19 20
// buffered socket
export tcp_socket_buf, socket_buf;
21 22
// errors
export tcp_err_data, tcp_connect_err_data;
23
// operations on a tcp_socket
24
export write, write_future, read_start, read_stop;
25
// tcp server stuff
26
export listen, accept;
27 28
// tcp client stuff
export connect;
J
Jeff Olson 已提交
29

30
#[nolink]
31
extern mod rustrt {
32
    fn rust_uv_current_kernel_malloc(size: libc::c_uint) -> *libc::c_void;
33
    fn rust_uv_current_kernel_free(mem: *libc::c_void);
34
    fn rust_uv_helper_uv_tcp_t_size() -> libc::c_uint;
35 36
}

37 38
#[doc="
Encapsulates an open TCP/IP connection through libuv
39

40 41 42
`tcp_socket` is non-copyable/sendable and automagically handles closing the
underlying libuv data structures when it goes out of scope. This is the
data structure that is used for read/write operations over a TCP stream.
43
"]
44 45 46 47
class tcp_socket {
  let socket_data: @tcp_socket_data;
  new(socket_data: @tcp_socket_data) { self.socket_data = socket_data; }
  drop {
48
    unsafe {
49
        tear_down_socket_data(self.socket_data)
50 51
    }
  }
J
Jeff Olson 已提交
52 53
}

54 55 56 57 58 59
#[doc="
A buffered wrapper for `net::tcp::tcp_socket`

It is created with a call to `net::tcp::socket_buf()` and has impls that
satisfy both the `io::reader` and `io::writer` ifaces.
"]
60 61 62
class tcp_socket_buf {
  let data: @tcp_buffered_socket_data;
  new(data: @tcp_buffered_socket_data) { self.data = data; }
63 64
}

65 66 67
#[doc="
Contains raw, string-based, error information returned from libuv
"]
68 69 70 71
type tcp_err_data = {
    err_name: str,
    err_msg: str
};
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
#[doc="
Details returned as part of a `result::err` result from `tcp::listen`
"]
enum tcp_listen_err_data {
    #[doc="
    Some unplanned-for error. The first and second fields correspond
    to libuv's `err_name` and `err_msg` fields, respectively.
    "]
    generic_listen_err(str, str),
    #[doc="
    Failed to bind to the requested IP/Port, because it is already in use.

    # Possible Causes

    * Attempting to bind to a port already bound to another listener
    "]
    address_in_use,
    #[doc="
    Request to bind to an IP/Port was denied by the system.

    # Possible Causes

    * Attemping to binding to an IP/Port as a non-Administrator
      on Windows Vista+
    * Attempting to bind, as a non-priv'd
      user, to 'privileged' ports (< 1024) on *nix
    "]
    access_denied
}
#[doc="
Details returned as part of a `result::err` result from `tcp::connect`
"]
104 105 106 107 108 109 110 111 112 113 114
enum tcp_connect_err_data {
    #[doc="
    Some unplanned-for error. The first and second fields correspond
    to libuv's `err_name` and `err_msg` fields, respectively.
    "]
    generic_connect_err(str, str),
    #[doc="
    Invalid IP or invalid port
    "]
    connection_refused
}
115

J
Jeff Olson 已提交
116 117 118 119 120
#[doc="
Initiate a client connection over TCP/IP

# Arguments

121
* `input_ip` - The IP address (versions 4 or 6) of the remote host
122
* `port` - the unsigned integer of the desired remote host port
123
* `iotask` - a `uv::iotask` that the tcp request will run on
J
Jeff Olson 已提交
124 125 126

# Returns

127 128 129 130
A `result` that, if the operation succeeds, contains a `net::net::tcp_socket`
that can be used to send and receive data to/from the remote host. In the
event of failure, a `net::tcp::tcp_connect_err_data` instance will be
returned
J
Jeff Olson 已提交
131
"]
132
fn connect(-input_ip: ip::ip_addr, port: uint,
133
           iotask: iotask)
134
    -> result::result<tcp_socket, tcp_connect_err_data> unsafe {
J
Jeff Olson 已提交
135 136 137 138 139 140 141
    let result_po = comm::port::<conn_attempt>();
    let closed_signal_po = comm::port::<()>();
    let conn_data = {
        result_ch: comm::chan(result_po),
        closed_signal_ch: comm::chan(closed_signal_po)
    };
    let conn_data_ptr = ptr::addr_of(conn_data);
142
    let reader_po = comm::port::<result::result<~[u8], tcp_err_data>>();
143 144 145
    let stream_handle_ptr = malloc_uv_tcp_t();
    *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
    let socket_data = @{
146 147
        reader_po: reader_po,
        reader_ch: comm::chan(reader_po),
148 149 150
        stream_handle_ptr: stream_handle_ptr,
        connect_req: uv::ll::connect_t(),
        write_req: uv::ll::write_t(),
151
        iotask: iotask
J
Jeff Olson 已提交
152
    };
153
    let socket_data_ptr = ptr::addr_of(*socket_data);
J
Jeff Olson 已提交
154 155 156
    log(debug, #fmt("tcp_connect result_ch %?", conn_data.result_ch));
    // get an unsafe representation of our stream_handle_ptr that
    // we can send into the interact cb to be handled in libuv..
157
    log(debug, #fmt("stream_handle_ptr outside interact %?",
158
        stream_handle_ptr));
B
Brian Anderson 已提交
159
    do iotask::interact(iotask) |loop_ptr| {
J
Jeff Olson 已提交
160
        log(debug, "in interact cb for tcp client connect..");
161
        log(debug, #fmt("stream_handle_ptr in interact %?",
J
Jeff Olson 已提交
162 163 164 165 166 167 168 169 170
            stream_handle_ptr));
        alt uv::ll::tcp_init( loop_ptr, stream_handle_ptr) {
          0i32 {
            log(debug, "tcp_init successful");
            alt input_ip {
              ipv4 {
                log(debug, "dealing w/ ipv4 connection..");
                let connect_req_ptr =
                    ptr::addr_of((*socket_data_ptr).connect_req);
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
                let addr_str = ip::format_addr(input_ip);
                let connect_result = alt input_ip {
                  ip::ipv4(addr) {
                    // have to "recreate" the sockaddr_in/6
                    // since the ip_addr discards the port
                    // info.. should probably add an additional
                    // rust type that actually is closer to
                    // what the libuv API expects (ip str + port num)
                    log(debug, #fmt("addr: %?", addr));
                    let in_addr = uv::ll::ip4_addr(addr_str, port as int);
                    uv::ll::tcp_connect(
                        connect_req_ptr,
                        stream_handle_ptr,
                        ptr::addr_of(in_addr),
                        tcp_connect_on_connect_cb)
                  }
                  ip::ipv6(addr) {
                    log(debug, #fmt("addr: %?", addr));
                    let in_addr = uv::ll::ip6_addr(addr_str, port as int);
                    uv::ll::tcp_connect6(
                        connect_req_ptr,
                        stream_handle_ptr,
                        ptr::addr_of(in_addr),
                        tcp_connect_on_connect_cb)
                  }
                };
                alt connect_result {
J
Jeff Olson 已提交
198 199 200 201 202
                  0i32 {
                    log(debug, "tcp_connect successful");
                    // reusable data that we'll have for the
                    // duration..
                    uv::ll::set_data_for_uv_handle(stream_handle_ptr,
203 204
                                               socket_data_ptr as
                                                  *libc::c_void);
J
Jeff Olson 已提交
205 206 207 208 209 210 211 212 213 214 215 216 217
                    // just so the connect_cb can send the
                    // outcome..
                    uv::ll::set_data_for_req(connect_req_ptr,
                                             conn_data_ptr);
                    log(debug, "leaving tcp_connect interact cb...");
                    // let tcp_connect_on_connect_cb send on
                    // the result_ch, now..
                  }
                  _ {
                    // immediate connect failure.. probably a garbage
                    // ip or somesuch
                    let err_data = uv::ll::get_last_err_data(loop_ptr);
                    comm::send((*conn_data_ptr).result_ch,
218
                               conn_failure(err_data.to_tcp_err()));
J
Jeff Olson 已提交
219 220 221 222 223 224 225 226 227 228 229 230
                    uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                                   conn_data_ptr);
                    uv::ll::close(stream_handle_ptr, stream_error_close_cb);
                  }
                }
              }
            }
        }
          _ {
            // failure to create a tcp handle
            let err_data = uv::ll::get_last_err_data(loop_ptr);
            comm::send((*conn_data_ptr).result_ch,
231
                       conn_failure(err_data.to_tcp_err()));
J
Jeff Olson 已提交
232 233 234 235 236 237
          }
        }
    };
    alt comm::recv(result_po) {
      conn_success {
        log(debug, "tcp::connect - received success on result_po");
238
        result::ok(tcp_socket(socket_data))
J
Jeff Olson 已提交
239 240 241 242
      }
      conn_failure(err_data) {
        comm::recv(closed_signal_po);
        log(debug, "tcp::connect - received failure on result_po");
243 244 245 246 247 248 249 250
        // still have to free the malloc'd stream handle..
        rustrt::rust_uv_current_kernel_free(stream_handle_ptr
                                           as *libc::c_void);
        let tcp_conn_err = alt err_data.err_name {
          "ECONNREFUSED" { connection_refused }
          _ { generic_connect_err(err_data.err_name, err_data.err_msg) }
        };
        result::err(tcp_conn_err)
J
Jeff Olson 已提交
251 252 253
      }
    }
}
254 255

#[doc="
256
Write binary data to a tcp stream; Blocks until operation completes
257 258 259 260

# Arguments

* sock - a `tcp_socket` to write to
261
* raw_write_data - a vector of `[u8]/~` that will be written to the stream.
262 263 264 265
This value must remain valid for the duration of the `write` call

# Returns

266 267
A `result` object with a `nil` value as the `ok` variant, or a `tcp_err_data`
value as the `err` variant
268
"]
269
fn write(sock: tcp_socket, raw_write_data: ~[u8])
270
    -> result::result<(), tcp_err_data> unsafe {
271
    let socket_data_ptr = ptr::addr_of(*(sock.socket_data));
272 273 274 275 276 277 278 279
    write_common_impl(socket_data_ptr, raw_write_data)
}

#[doc="
Write binary data to tcp stream; Returns a `future::future` value immediately

# Safety

280 281 282 283 284 285
This function can produce unsafe results if:

1. the call to `write_future` is made
2. the `future::future` value returned is never resolved via
`future::get`
3. and then the `tcp_socket` passed in to `write_future` leaves
286
scope and is destructed before the task that runs the libuv write
287 288 289 290 291 292 293 294 295
operation completes.

As such: If using `write_future`, always be sure to resolve the returned
`future` so as to ensure libuv doesn't try to access a released write handle.
Otherwise, use the blocking `tcp::write` function instead.

# Arguments

* sock - a `tcp_socket` to write to
296
* raw_write_data - a vector of `[u8]/~` that will be written to the stream.
297 298 299 300 301 302 303 304
This value must remain valid for the duration of the `write` call

# Returns

A `future` value that, once the `write` operation completes, resolves to a
`result` object with a `nil` value as the `ok` variant, or a `tcp_err_data`
value as the `err` variant
"]
305
fn write_future(sock: tcp_socket, raw_write_data: ~[u8])
306
    -> future::future<result::result<(), tcp_err_data>> unsafe {
307
    let socket_data_ptr = ptr::addr_of(*(sock.socket_data));
308
    do future_spawn {
309
        let data_copy = copy(raw_write_data);
310
        write_common_impl(socket_data_ptr, data_copy)
311
    }
312 313
}

314
#[doc="
315
Begin reading binary data from an open TCP connection; used with `read_stop`
316 317 318

# Arguments

319
* sock -- a `net::tcp::tcp_socket` for the connection to read from
320 321 322 323 324 325

# Returns

* A `result` instance that will either contain a
`comm::port<tcp_read_result>` that the user can read (and optionally, loop
on) from until `read_stop` is called, or a `tcp_err_data` record
326
"]
327 328
fn read_start(sock: tcp_socket)
    -> result::result<comm::port<
329
        result::result<~[u8], tcp_err_data>>, tcp_err_data> unsafe {
330
    let socket_data = ptr::addr_of(*(sock.socket_data));
331
    read_start_common_impl(socket_data)
332
}
333

334
#[doc="
335 336 337 338 339
Stop reading from an open TCP connection; used with `read_start`

# Arguments

* `sock` - a `net::tcp::tcp_socket` that you wish to stop reading on
340
"]
341
fn read_stop(sock: tcp_socket,
342
             -read_port: comm::port<result::result<[u8]/~, tcp_err_data>>) ->
343
    result::result<(), tcp_err_data> unsafe {
344
    log(debug, #fmt("taking the read_port out of commission %?", read_port));
345
    let socket_data = ptr::addr_of(*sock.socket_data);
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
    read_stop_common_impl(socket_data)
}

#[doc="
Reads a single chunk of data from `tcp_socket`; block until data/error recv'd

Does a blocking read operation for a single chunk of data from a `tcp_socket`
until a data arrives or an error is received. The provided `timeout_msecs`
value is used to raise an error if the timeout period passes without any
data received.

# Arguments

* `sock` - a `net::tcp::tcp_socket` that you wish to read from
* `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
read attempt. Pass `0u` to wait indefinitely
"]
fn read(sock: tcp_socket, timeout_msecs: uint)
364
    -> result::result<~[u8],tcp_err_data> {
365
    let socket_data = ptr::addr_of(*(sock.socket_data));
366 367 368 369
    read_common_impl(socket_data, timeout_msecs)
}

#[doc="
370
Reads a single chunk of data; returns a `future::future<[u8]/~>` immediately
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397

Does a non-blocking read operation for a single chunk of data from a
`tcp_socket` and immediately returns a `future` value representing the
result. When resolving the returned `future`, it will block until data
arrives or an error is received. The provided `timeout_msecs`
value is used to raise an error if the timeout period passes without any
data received.

# Safety

This function can produce unsafe results if the call to `read_future` is
made, the `future::future` value returned is never resolved via
`future::get`, and then the `tcp_socket` passed in to `read_future` leaves
scope and is destructed before the task that runs the libuv read
operation completes.

As such: If using `read_future`, always be sure to resolve the returned
`future` so as to ensure libuv doesn't try to access a released read handle.
Otherwise, use the blocking `tcp::read` function instead.

# Arguments

* `sock` - a `net::tcp::tcp_socket` that you wish to read from
* `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
read attempt. Pass `0u` to wait indefinitely
"]
fn read_future(sock: tcp_socket, timeout_msecs: uint)
398
    -> future::future<result::result<~[u8],tcp_err_data>> {
399
    let socket_data = ptr::addr_of(*(sock.socket_data));
400
    do future_spawn {
401
        read_common_impl(socket_data, timeout_msecs)
402
    }
403
}
404

405 406 407 408 409
#[doc="
Bind an incoming client connection to a `net::tcp::tcp_socket`

# Notes

410 411 412
It is safe to call `net::tcp::accept` _only_ within the context of the
`new_connect_cb` callback provided as the final argument to the
`net::tcp::listen` function.
413

414 415 416
The `new_conn` opaque value is provided _only_ as the first argument to the
`new_connect_cb` provided as a part of `net::tcp::listen`.
It can be safely sent to another task but it _must_ be
417
used (via `net::tcp::accept`) before the `new_connect_cb` call it was
418
provided to returns.
419

420
This implies that a port/chan pair must be used to make sure that the
421 422 423
`new_connect_cb` call blocks until an attempt to create a
`net::tcp::tcp_socket` is completed.

424 425 426 427 428 429
# Example

Here, the `new_conn` is used in conjunction with `accept` from within
a task spawned by the `new_connect_cb` passed into `listen`

~~~~~~~~~~~
430 431 432 433 434 435 436 437 438
net::tcp::listen(remote_ip, remote_port, backlog)
    // this callback is ran once after the connection is successfully
    // set up
    {|kill_ch|
      // pass the kill_ch to your main loop or wherever you want
      // to be able to externally kill the server from
    }
    // this callback is ran when a new connection arrives
    {|new_conn, kill_ch|
439 440 441 442
    let cont_po = comm::port::<option<tcp_err_data>>();
    let cont_ch = comm::chan(cont_po);
    task::spawn {||
        let accept_result = net::tcp::accept(new_conn);
443
        if accept_result.is_err() {
444 445 446 447
            comm::send(cont_ch, result::get_err(accept_result));
            // fail?
        }
        else {
448 449 450 451 452 453 454 455 456 457 458 459 460 461
            let sock = result::get(accept_result);
            comm::send(cont_ch, true);
            // do work here
        }
    };
    alt comm::recv(cont_po) {
      // shut down listen()
      some(err_data) { comm::send(kill_chan, some(err_data)) }
      // wait for next connection
      none {}
    }
};
~~~~~~~~~~~

462 463 464 465 466 467
# Arguments

* `new_conn` - an opaque value used to create a new `tcp_socket`

# Returns

468 469 470 471 472
On success, this function will return a `net::tcp::tcp_socket` as the
`ok` variant of a `result`. The `net::tcp::tcp_socket` is anchored within
the task that `accept` was called within for its lifetime. On failure,
this function will return a `net::tcp::tcp_err_data` record
as the `err` variant of a `result`.
473 474 475 476 477 478 479
"]
fn accept(new_conn: tcp_new_connection)
    -> result::result<tcp_socket, tcp_err_data> unsafe {

    alt new_conn{
      new_tcp_conn(server_handle_ptr) {
        let server_data_ptr = uv::ll::get_data_for_uv_handle(
480
            server_handle_ptr) as *tcp_listen_fc_data;
481
        let reader_po = comm::port::<result::result<~[u8], tcp_err_data>>();
482
        let iotask = (*server_data_ptr).iotask;
483 484 485
        let stream_handle_ptr = malloc_uv_tcp_t();
        *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
        let client_socket_data = @{
486 487
            reader_po: reader_po,
            reader_ch: comm::chan(reader_po),
488
            stream_handle_ptr : stream_handle_ptr,
489 490
            connect_req : uv::ll::connect_t(),
            write_req : uv::ll::write_t(),
491
            iotask : iotask
492
        };
493 494 495
        let client_socket_data_ptr = ptr::addr_of(*client_socket_data);
        let client_stream_handle_ptr =
            (*client_socket_data_ptr).stream_handle_ptr;
496 497 498

        let result_po = comm::port::<option<tcp_err_data>>();
        let result_ch = comm::chan(result_po);
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514

        // UNSAFE LIBUV INTERACTION BEGIN
        // .. normally this happens within the context of
        // a call to uv::hl::interact.. but we're breaking
        // the rules here because this always has to be
        // called within the context of a listen() new_connect_cb
        // callback (or it will likely fail and drown your cat)
        log(debug, "in interact cb for tcp::accept");
        let loop_ptr = uv::ll::get_loop_for_uv_handle(
            server_handle_ptr);
        alt uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
          0i32 {
            log(debug, "uv_tcp_init successful for client stream");
            alt uv::ll::accept(
                server_handle_ptr as *libc::c_void,
                client_stream_handle_ptr as *libc::c_void) {
515
              0i32 {
516 517
                log(debug, "successfully accepted client connection");
                uv::ll::set_data_for_uv_handle(client_stream_handle_ptr,
518 519
                                               client_socket_data_ptr
                                                   as *libc::c_void);
520
                comm::send(result_ch, none);
521 522
              }
              _ {
523
                log(debug, "failed to accept client conn");
524 525 526 527
                comm::send(result_ch, some(
                    uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
              }
            }
528 529 530 531 532 533 534 535
          }
          _ {
            log(debug, "failed to init client stream");
            comm::send(result_ch, some(
                uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
          }
        }
        // UNSAFE LIBUV INTERACTION END
536 537 538 539 540
        alt comm::recv(result_po) {
          some(err_data) {
            result::err(err_data)
          }
          none {
541
            result::ok(tcp_socket(client_socket_data))
542 543 544 545 546 547
          }
        }
      }
    }
}

548 549 550 551 552 553 554 555 556 557
#[doc="
Bind to a given IP/port and listen for new connections

# Arguments

* `host_ip` - a `net::ip::ip_addr` representing a unique IP
(versions 4 or 6)
* `port` - a uint representing the port to listen on
* `backlog` - a uint representing the number of incoming connections
to cache in memory
558
* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on
559 560 561 562 563 564 565 566 567 568 569 570 571
* `on_establish_cb` - a callback that is evaluated if/when the listener
is successfully established. it takes no parameters
* `new_connect_cb` - a callback to be evaluated, on the libuv thread,
whenever a client attempts to conect on the provided ip/port. the
callback's arguments are:
    * `new_conn` - an opaque type that can be passed to
    `net::tcp::accept` in order to be converted to a `tcp_socket`.
    * `kill_ch` - channel of type `comm::chan<option<tcp_err_data>>`. this
    channel can be used to send a message to cause `listen` to begin
    closing the underlying libuv data structures.

# returns

572
a `result` instance containing empty data of type `()` on a
573
successful/normal shutdown, and a `tcp_listen_err_data` enum in the event
574
of listen exiting because of an error
575
"]
576
fn listen(-host_ip: ip::ip_addr, port: uint, backlog: uint,
577
          iotask: iotask,
578
          on_establish_cb: fn~(comm::chan<option<tcp_err_data>>),
579 580
          +new_connect_cb: fn~(tcp_new_connection,
                               comm::chan<option<tcp_err_data>>))
581
    -> result::result<(), tcp_listen_err_data> unsafe {
582
    do listen_common(host_ip, port, backlog, iotask, on_establish_cb)
583
        // on_connect_cb
584
        |handle| {
585 586 587 588 589 590 591 592
            let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
                as *tcp_listen_fc_data;
            let new_conn = new_tcp_conn(handle);
            let kill_ch = (*server_data_ptr).kill_ch;
            new_connect_cb(new_conn, kill_ch);
    }
}

593
fn listen_common(-host_ip: ip::ip_addr, port: uint, backlog: uint,
594 595 596
          iotask: iotask,
          on_establish_cb: fn~(comm::chan<option<tcp_err_data>>),
          -on_connect_cb: fn~(*uv::ll::uv_tcp_t))
597
    -> result::result<(), tcp_listen_err_data> unsafe {
598 599 600 601 602 603 604 605 606
    let stream_closed_po = comm::port::<()>();
    let kill_po = comm::port::<option<tcp_err_data>>();
    let kill_ch = comm::chan(kill_po);
    let server_stream = uv::ll::tcp_t();
    let server_stream_ptr = ptr::addr_of(server_stream);
    let server_data = {
        server_stream_ptr: server_stream_ptr,
        stream_closed_ch: comm::chan(stream_closed_po),
        kill_ch: kill_ch,
607
        on_connect_cb: on_connect_cb,
608
        iotask: iotask,
609 610 611 612
        mut active: true
    };
    let server_data_ptr = ptr::addr_of(server_data);

613
    let setup_result = do comm::listen |setup_ch| {
J
Jeff Olson 已提交
614
        // this is to address a compiler warning about
615 616 617 618 619 620
        // an implicit copy.. it seems that double nested
        // will defeat a move sigil, as is done to the host_ip
        // arg above.. this same pattern works w/o complaint in
        // tcp::connect (because the iotask::interact cb isn't
        // nested within a comm::listen block)
        let loc_ip = copy(host_ip);
621
        do iotask::interact(iotask) |loop_ptr| {
622
            alt uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
623
              0i32 {
624 625 626
                uv::ll::set_data_for_uv_handle(
                    server_stream_ptr,
                    server_data_ptr);
627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642
                let addr_str = ip::format_addr(loc_ip);
                let bind_result = alt loc_ip {
                  ip::ipv4(addr) {
                    log(debug, #fmt("addr: %?", addr));
                    let in_addr = uv::ll::ip4_addr(addr_str, port as int);
                    uv::ll::tcp_bind(server_stream_ptr,
                                     ptr::addr_of(in_addr))
                  }
                  ip::ipv6(addr) {
                    log(debug, #fmt("addr: %?", addr));
                    let in_addr = uv::ll::ip6_addr(addr_str, port as int);
                    uv::ll::tcp_bind6(server_stream_ptr,
                                     ptr::addr_of(in_addr))
                  }
                };
                alt bind_result {
643
                  0i32 {
644 645 646 647 648 649 650 651 652 653 654 655
                    alt uv::ll::listen(server_stream_ptr,
                                       backlog as libc::c_int,
                                       tcp_lfc_on_connection_cb) {
                      0i32 {
                        comm::send(setup_ch, none);
                      }
                      _ {
                        log(debug, "failure to uv_listen()");
                        let err_data = uv::ll::get_last_err_data(loop_ptr);
                        comm::send(setup_ch, some(err_data));
                      }
                    }
656 657
                  }
                  _ {
658
                    log(debug, "failure to uv_tcp_bind");
659 660 661 662 663 664
                    let err_data = uv::ll::get_last_err_data(loop_ptr);
                    comm::send(setup_ch, some(err_data));
                  }
                }
              }
              _ {
665
                log(debug, "failure to uv_tcp_init");
666 667 668 669
                let err_data = uv::ll::get_last_err_data(loop_ptr);
                comm::send(setup_ch, some(err_data));
              }
            }
670 671
        };
        setup_ch.recv()
672
    };
673
    alt setup_result {
674
      some(err_data) {
675
        do iotask::interact(iotask) |loop_ptr| {
676 677 678 679 680 681 682 683 684 685 686 687 688 689
            log(debug, #fmt("tcp::listen post-kill recv hl interact %?",
                            loop_ptr));
            (*server_data_ptr).active = false;
            uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
        };
        stream_closed_po.recv();
        alt err_data.err_name {
          "EACCES" {
            log(debug, "Got EACCES error");
            result::err(access_denied)
          }
          "EADDRINUSE" {
            log(debug, "Got EADDRINUSE error");
            result::err(address_in_use)
690 691
          }
          _ {
692 693 694 695
            log(debug, #fmt("Got '%s' '%s' libuv error",
                            err_data.err_name, err_data.err_msg));
            result::err(
                generic_listen_err(err_data.err_name, err_data.err_msg))
696 697 698 699 700
          }
        }
      }
      none {
        on_establish_cb(kill_ch);
N
Niko Matsakis 已提交
701
        let kill_result = comm::recv(kill_po);
B
Brian Anderson 已提交
702
        do iotask::interact(iotask) |loop_ptr| {
703 704 705 706 707
            log(debug, #fmt("tcp::listen post-kill recv hl interact %?",
                            loop_ptr));
            (*server_data_ptr).active = false;
            uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
        };
708
        stream_closed_po.recv();
709 710 711
        alt kill_result {
          // some failure post bind/listen
          some(err_data) {
712 713
            result::err(generic_listen_err(err_data.err_name,
                                            err_data.err_msg))
714 715 716 717 718 719 720 721 722 723
          }
          // clean exit
          none {
            result::ok(())
          }
        }
      }
    }
}

724
#[doc="
725 726 727 728 729 730 731 732 733 734 735 736 737
Convert a `net::tcp::tcp_socket` to a `net::tcp::tcp_socket_buf`.

This function takes ownership of a `net::tcp::tcp_socket`, returning it
stored within a buffered wrapper, which can be converted to a `io::reader`
or `io::writer`

# Arguments

* `sock` -- a `net::tcp::tcp_socket` that you want to buffer

# Returns

A buffered wrapper that you can cast as an `io::reader` or `io::writer`
738
"]
739
fn socket_buf(-sock: tcp_socket) -> tcp_socket_buf {
740
    tcp_socket_buf(@{ sock: sock, mut buf: []/~ })
741 742 743 744 745
}

#[doc="
Convenience methods extending `net::tcp::tcp_socket`
"]
746
impl tcp_socket for tcp_socket {
747
    fn read_start() -> result::result<comm::port<
748
        result::result<~[u8], tcp_err_data>>, tcp_err_data> {
749 750
        read_start(self)
    }
751
    fn read_stop(-read_port:
752
                 comm::port<result::result<[u8]/~, tcp_err_data>>) ->
753
        result::result<(), tcp_err_data> {
754
        read_stop(self, read_port)
755
    }
756
    fn read(timeout_msecs: uint) ->
757
        result::result<~[u8], tcp_err_data> {
758 759 760
        read(self, timeout_msecs)
    }
    fn read_future(timeout_msecs: uint) ->
761
        future::future<result::result<~[u8], tcp_err_data>> {
762 763
        read_future(self, timeout_msecs)
    }
764
    fn write(raw_write_data: ~[u8])
765 766 767
        -> result::result<(), tcp_err_data> {
        write(self, raw_write_data)
    }
768
    fn write_future(raw_write_data: ~[u8])
769
        -> future::future<result::result<(), tcp_err_data>> {
770 771
        write_future(self, raw_write_data)
    }
772
}
773 774 775 776

#[doc="
Implementation of `io::reader` iface for a buffered `net::tcp::tcp_socket`
"]
777
impl tcp_socket_buf of io::reader for @tcp_socket_buf {
778
    fn read_bytes(amt: uint) -> [u8]/~ {
779
        let has_amt_available =
780
            vec::len((*(self.data)).buf) >= amt;
781 782
        if has_amt_available {
            // no arbitrary-length shift in vec::?
783
            let mut ret_buf = []/~;
784
            while vec::len(ret_buf) < amt {
785
                ret_buf += [vec::shift((*(self.data)).buf)]/~;
786 787 788 789
            }
            ret_buf
        }
        else {
790
            let read_result = read((*(self.data)).sock, 0u);
791
            if read_result.is_err() {
792 793 794
                let err_data = read_result.get_err();
                log(debug, #fmt("ERROR sock_buf as io::reader.read err %? %?",
                                 err_data.err_name, err_data.err_msg));
795
                []/~
796 797 798
            }
            else {
                let new_chunk = result::unwrap(read_result);
799
                (*(self.data)).buf += new_chunk;
800 801 802 803 804 805 806 807
                self.read_bytes(amt)
            }
        }
    }
    fn read_byte() -> int {
        self.read_bytes(1u)[0] as int
    }
    fn unread_byte(amt: int) {
808
        vec::unshift((*(self.data)).buf, amt as u8);
809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824
    }
    fn eof() -> bool {
        false // noop
    }
    fn seek(dist: int, seek: io::seek_style) {
        log(debug, #fmt("tcp_socket_buf seek stub %? %?", dist, seek));
        // noop
    }
    fn tell() -> uint {
        0u // noop
    }
}

#[doc="
Implementation of `io::reader` iface for a buffered `net::tcp::tcp_socket`
"]
825
impl tcp_socket_buf of io::writer for @tcp_socket_buf {
826
    fn write(data: [const u8]/&) unsafe {
827 828
        let socket_data_ptr =
            ptr::addr_of(*((*(self.data)).sock).socket_data);
829 830
        let w_result = write_common_impl(socket_data_ptr,
                                        vec::slice(data, 0, vec::len(data)));
831
        if w_result.is_err() {
832 833 834 835
            let err_data = w_result.get_err();
            log(debug, #fmt("ERROR sock_buf as io::writer.writer err: %? %?",
                             err_data.err_name, err_data.err_msg));
        }
836 837 838 839 840 841 842 843 844 845 846 847 848
    }
    fn seek(dist: int, seek: io::seek_style) {
      log(debug, #fmt("tcp_socket_buf seek stub %? %?", dist, seek));
        // noop
    }
    fn tell() -> uint {
        0u
    }
    fn flush() -> int {
        0
    }
}

849 850
// INTERNAL API

851 852 853 854 855 856 857 858
fn tear_down_socket_data(socket_data: @tcp_socket_data) unsafe {
    let closed_po = comm::port::<()>();
    let closed_ch = comm::chan(closed_po);
    let close_data = {
        closed_ch: closed_ch
    };
    let close_data_ptr = ptr::addr_of(close_data);
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
859
    do iotask::interact((*socket_data).iotask) |loop_ptr| {
860 861 862 863 864 865 866 867 868 869 870 871 872
        log(debug, #fmt("interact dtor for tcp_socket stream %? loop %?",
            stream_handle_ptr, loop_ptr));
        uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                       close_data_ptr);
        uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb);
    };
    comm::recv(closed_po);
    log(debug, #fmt("about to free socket_data at %?", socket_data));
    rustrt::rust_uv_current_kernel_free(stream_handle_ptr
                                       as *libc::c_void);
    log(debug, "exiting dtor for tcp_socket");
}

873 874
// shared implementation for tcp::read
fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint)
875
    -> result::result<~[u8],tcp_err_data> unsafe {
876
    log(debug, "starting tcp::read");
877
    let iotask = (*socket_data).iotask;
878
    let rs_result = read_start_common_impl(socket_data);
879
    if result::is_err(rs_result) {
880 881 882 883 884 885 886
        let err_data = result::get_err(rs_result);
        result::err(err_data)
    }
    else {
        log(debug, "tcp::read before recv_timeout");
        let read_result = if timeout_msecs > 0u {
            timer::recv_timeout(
887
               iotask, timeout_msecs, result::get(rs_result))
888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916
        } else {
            some(comm::recv(result::get(rs_result)))
        };
        log(debug, "tcp::read after recv_timeout");
        alt read_result {
          none {
            log(debug, "tcp::read: timed out..");
            let err_data = {
                err_name: "TIMEOUT",
                err_msg: "req timed out"
            };
            read_stop_common_impl(socket_data);
            result::err(err_data)
          }
          some(data_result) {
            log(debug, "tcp::read got data");
            read_stop_common_impl(socket_data);
            data_result
          }
        }
    }
}

// shared impl for read_stop
fn read_stop_common_impl(socket_data: *tcp_socket_data) ->
    result::result<(), tcp_err_data> unsafe {
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
    let stop_po = comm::port::<option<tcp_err_data>>();
    let stop_ch = comm::chan(stop_po);
B
Brian Anderson 已提交
917
    do iotask::interact((*socket_data).iotask) |loop_ptr| {
918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943
        log(debug, "in interact cb for tcp::read_stop");
        alt uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
          0i32 {
            log(debug, "successfully called uv_read_stop");
            comm::send(stop_ch, none);
          }
          _ {
            log(debug, "failure in calling uv_read_stop");
            let err_data = uv::ll::get_last_err_data(loop_ptr);
            comm::send(stop_ch, some(err_data.to_tcp_err()));
          }
        }
    };
    alt comm::recv(stop_po) {
      some(err_data) {
        result::err(err_data.to_tcp_err())
      }
      none {
        result::ok(())
      }
    }
}

// shared impl for read_start
fn read_start_common_impl(socket_data: *tcp_socket_data)
    -> result::result<comm::port<
944
        result::result<~[u8], tcp_err_data>>, tcp_err_data> unsafe {
945 946 947 948
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
    let start_po = comm::port::<option<uv::ll::uv_err_data>>();
    let start_ch = comm::chan(start_po);
    log(debug, "in tcp::read_start before interact loop");
B
Brian Anderson 已提交
949
    do iotask::interact((*socket_data).iotask) |loop_ptr| {
950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974
        log(debug, #fmt("in tcp::read_start interact cb %?", loop_ptr));
        alt uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
                               on_alloc_cb,
                               on_tcp_read_cb) {
          0i32 {
            log(debug, "success doing uv_read_start");
            comm::send(start_ch, none);
          }
          _ {
            log(debug, "error attempting uv_read_start");
            let err_data = uv::ll::get_last_err_data(loop_ptr);
            comm::send(start_ch, some(err_data));
          }
        }
    };
    alt comm::recv(start_po) {
      some(err_data) {
        result::err(err_data.to_tcp_err())
      }
      none {
        result::ok((*socket_data).reader_po)
      }
    }
}

975 976
// helper to convert a "class" vector of [u8] to a *[uv::ll::uv_buf_t]

977 978
// shared implementation used by write and write_future
fn write_common_impl(socket_data_ptr: *tcp_socket_data,
979
                     raw_write_data: ~[u8])
980 981 982 983
    -> result::result<(), tcp_err_data> unsafe {
    let write_req_ptr = ptr::addr_of((*socket_data_ptr).write_req);
    let stream_handle_ptr =
        (*socket_data_ptr).stream_handle_ptr;
984
    let write_buf_vec =  ~[ uv::ll::buf_init(
985
        vec::unsafe::to_ptr(raw_write_data),
986
        vec::len(raw_write_data)) ];
987 988 989 990 991 992
    let write_buf_vec_ptr = ptr::addr_of(write_buf_vec);
    let result_po = comm::port::<tcp_write_result>();
    let write_data = {
        result_ch: comm::chan(result_po)
    };
    let write_data_ptr = ptr::addr_of(write_data);
B
Brian Anderson 已提交
993
    do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| {
994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010
        log(debug, #fmt("in interact cb for tcp::write %?", loop_ptr));
        alt uv::ll::write(write_req_ptr,
                          stream_handle_ptr,
                          write_buf_vec_ptr,
                          tcp_write_complete_cb) {
          0i32 {
            log(debug, "uv_write() invoked successfully");
            uv::ll::set_data_for_req(write_req_ptr, write_data_ptr);
          }
          _ {
            log(debug, "error invoking uv_write()");
            let err_data = uv::ll::get_last_err_data(loop_ptr);
            comm::send((*write_data_ptr).result_ch,
                       tcp_write_error(err_data.to_tcp_err()));
          }
        }
    };
1011 1012 1013 1014
    // FIXME (#2656): Instead of passing unsafe pointers to local data,
    // and waiting here for the write to complete, we should transfer
    // ownership of everything to the I/O task and let it deal with the
    // aftermath, so we don't have to sit here blocking.
1015 1016 1017 1018 1019 1020
    alt comm::recv(result_po) {
      tcp_write_success { result::ok(()) }
      tcp_write_error(err_data) { result::err(err_data.to_tcp_err()) }
    }
}

1021 1022 1023 1024
enum tcp_new_connection {
    new_tcp_conn(*uv::ll::uv_tcp_t)
}

1025
type tcp_listen_fc_data = {
1026 1027 1028
    server_stream_ptr: *uv::ll::uv_tcp_t,
    stream_closed_ch: comm::chan<()>,
    kill_ch: comm::chan<option<tcp_err_data>>,
1029
    on_connect_cb: fn~(*uv::ll::uv_tcp_t),
1030
    iotask: iotask,
1031 1032 1033
    mut active: bool
};

G
Graydon Hoare 已提交
1034
extern fn tcp_lfc_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
1035
    let server_data_ptr = uv::ll::get_data_for_uv_handle(
1036
        handle) as *tcp_listen_fc_data;
1037 1038 1039
    comm::send((*server_data_ptr).stream_closed_ch, ());
}

G
Graydon Hoare 已提交
1040
extern fn tcp_lfc_on_connection_cb(handle: *uv::ll::uv_tcp_t,
1041 1042
                                     status: libc::c_int) unsafe {
    let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
1043
        as *tcp_listen_fc_data;
1044
    let kill_ch = (*server_data_ptr).kill_ch;
1045
    if (*server_data_ptr).active {
1046 1047
        alt status {
          0i32 {
1048
            (*server_data_ptr).on_connect_cb(handle);
1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060
          }
          _ {
            let loop_ptr = uv::ll::get_loop_for_uv_handle(handle);
            comm::send(kill_ch,
                       some(uv::ll::get_last_err_data(loop_ptr)
                            .to_tcp_err()));
            (*server_data_ptr).active = false;
          }
        }
    }
}

1061
fn malloc_uv_tcp_t() -> *uv::ll::uv_tcp_t unsafe {
1062
    rustrt::rust_uv_current_kernel_malloc(
1063
        rustrt::rust_uv_helper_uv_tcp_t_size()) as *uv::ll::uv_tcp_t
1064 1065
}

1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081
enum tcp_connect_result {
    tcp_connected(tcp_socket),
    tcp_connect_error(tcp_err_data)
}

enum tcp_write_result {
    tcp_write_success,
    tcp_write_error(tcp_err_data)
}

enum tcp_read_start_result {
    tcp_read_start_success(comm::port<tcp_read_result>),
    tcp_read_start_error(tcp_err_data)
}

enum tcp_read_result {
1082
    tcp_read_data(~[u8]),
1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096
    tcp_read_done,
    tcp_read_err(tcp_err_data)
}

iface to_tcp_err_iface {
    fn to_tcp_err() -> tcp_err_data;
}

impl of to_tcp_err_iface for uv::ll::uv_err_data {
    fn to_tcp_err() -> tcp_err_data {
        { err_name: self.err_name, err_msg: self.err_msg }
    }
}

G
Graydon Hoare 已提交
1097
extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
1098 1099
                    nread: libc::ssize_t,
                    ++buf: uv::ll::uv_buf_t) unsafe {
1100 1101
    log(debug, #fmt("entering on_tcp_read_cb stream: %? nread: %?",
                    stream, nread));
1102 1103 1104
    let loop_ptr = uv::ll::get_loop_for_uv_handle(stream);
    let socket_data_ptr = uv::ll::get_data_for_uv_handle(stream)
        as *tcp_socket_data;
1105
    alt nread as int {
1106 1107
      // incoming err.. probably eof
      -1 {
1108 1109 1110 1111 1112
        let err_data = uv::ll::get_last_err_data(loop_ptr).to_tcp_err();
        log(debug, #fmt("on_tcp_read_cb: incoming err.. name %? msg %?",
                        err_data.err_name, err_data.err_msg));
        let reader_ch = (*socket_data_ptr).reader_ch;
        comm::send(reader_ch, result::err(err_data));
1113 1114 1115 1116 1117 1118
      }
      // do nothing .. unneeded buf
      0 {}
      // have data
      _ {
        // we have data
1119
        log(debug, #fmt("tcp on_read_cb nread: %d", nread as int));
1120
        let reader_ch = (*socket_data_ptr).reader_ch;
1121 1122
        let buf_base = uv::ll::get_base_from_buf(buf);
        let buf_len = uv::ll::get_len_from_buf(buf);
1123
        let new_bytes = vec::unsafe::from_buf(buf_base, buf_len as uint);
1124
        comm::send(reader_ch, result::ok(new_bytes));
1125 1126 1127
      }
    }
    uv::ll::free_base_of_buf(buf);
1128
    log(debug, "exiting on_tcp_read_cb");
1129 1130
}

G
Graydon Hoare 已提交
1131
extern fn on_alloc_cb(handle: *libc::c_void,
1132
                     ++suggested_size: size_t)
1133 1134 1135 1136 1137 1138 1139
    -> uv::ll::uv_buf_t unsafe {
    log(debug, "tcp read on_alloc_cb!");
    let char_ptr = uv::ll::malloc_buf_base_of(suggested_size);
    log(debug, #fmt("tcp read on_alloc_cb h: %? char_ptr: %u sugsize: %u",
                     handle,
                     char_ptr as uint,
                     suggested_size as uint));
1140
    uv::ll::buf_init(char_ptr, suggested_size as uint)
1141
}
1142 1143 1144 1145 1146

type tcp_socket_close_data = {
    closed_ch: comm::chan<()>
};

G
Graydon Hoare 已提交
1147
extern fn tcp_socket_dtor_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
1148 1149 1150 1151 1152 1153 1154
    let data = uv::ll::get_data_for_uv_handle(handle)
        as *tcp_socket_close_data;
    let closed_ch = (*data).closed_ch;
    comm::send(closed_ch, ());
    log(debug, "tcp_socket_dtor_close_cb exiting..");
}

G
Graydon Hoare 已提交
1155
extern fn tcp_write_complete_cb(write_req: *uv::ll::uv_write_t,
1156 1157 1158
                              status: libc::c_int) unsafe {
    let write_data_ptr = uv::ll::get_data_for_req(write_req)
        as *write_req_data;
1159
    if status == 0i32 {
1160 1161
        log(debug, "successful write complete");
        comm::send((*write_data_ptr).result_ch, tcp_write_success);
1162
    } else {
1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175
        let stream_handle_ptr = uv::ll::get_stream_handle_from_write_req(
            write_req);
        let loop_ptr = uv::ll::get_loop_for_uv_handle(stream_handle_ptr);
        let err_data = uv::ll::get_last_err_data(loop_ptr);
        log(debug, "failure to write");
        comm::send((*write_data_ptr).result_ch, tcp_write_error(err_data));
    }
}

type write_req_data = {
    result_ch: comm::chan<tcp_write_result>
};

J
Jeff Olson 已提交
1176 1177 1178 1179 1180
type connect_req_data = {
    result_ch: comm::chan<conn_attempt>,
    closed_signal_ch: comm::chan<()>
};

G
Graydon Hoare 已提交
1181
extern fn stream_error_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
J
Jeff Olson 已提交
1182 1183 1184 1185 1186 1187
    let data = uv::ll::get_data_for_uv_handle(handle) as
        *connect_req_data;
    comm::send((*data).closed_signal_ch, ());
    log(debug, #fmt("exiting steam_error_close_cb for %?", handle));
}

G
Graydon Hoare 已提交
1188
extern fn tcp_connect_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
J
Jeff Olson 已提交
1189 1190 1191
    log(debug, #fmt("closed client tcp handle %?", handle));
}

G
Graydon Hoare 已提交
1192
extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t,
J
Jeff Olson 已提交
1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225
                                   status: libc::c_int) unsafe {
    let conn_data_ptr = (uv::ll::get_data_for_req(connect_req_ptr)
                      as *connect_req_data);
    let result_ch = (*conn_data_ptr).result_ch;
    log(debug, #fmt("tcp_connect result_ch %?", result_ch));
    let tcp_stream_ptr =
        uv::ll::get_stream_handle_from_connect_req(connect_req_ptr);
    alt status {
      0i32 {
        log(debug, "successful tcp connection!");
        comm::send(result_ch, conn_success);
      }
      _ {
        log(debug, "error in tcp_connect_on_connect_cb");
        let loop_ptr = uv::ll::get_loop_for_uv_handle(tcp_stream_ptr);
        let err_data = uv::ll::get_last_err_data(loop_ptr);
        log(debug, #fmt("err_data %? %?", err_data.err_name,
                        err_data.err_msg));
        comm::send(result_ch, conn_failure(err_data));
        uv::ll::set_data_for_uv_handle(tcp_stream_ptr,
                                       conn_data_ptr);
        uv::ll::close(tcp_stream_ptr, stream_error_close_cb);
      }
    }
    log(debug, "leaving tcp_connect_on_connect_cb");
}

enum conn_attempt {
    conn_success,
    conn_failure(uv::ll::uv_err_data)
}

type tcp_socket_data = {
1226 1227
    reader_po: comm::port<result::result<~[u8], tcp_err_data>>,
    reader_ch: comm::chan<result::result<~[u8], tcp_err_data>>,
1228
    stream_handle_ptr: *uv::ll::uv_tcp_t,
J
Jeff Olson 已提交
1229
    connect_req: uv::ll::uv_connect_t,
1230
    write_req: uv::ll::uv_write_t,
1231
    iotask: iotask
J
Jeff Olson 已提交
1232 1233
};

1234 1235
type tcp_buffered_socket_data = {
    sock: tcp_socket,
1236
    mut buf: [u8]/~
1237
};
J
Jeff Olson 已提交
1238

1239
//#[cfg(test)]
1240
mod test {
1241
    // FIXME don't run on fbsd or linux 32 bit (#2064)
1242 1243 1244 1245 1246 1247 1248 1249 1250 1251
    #[cfg(target_os="win32")]
    #[cfg(target_os="darwin")]
    #[cfg(target_os="linux")]
    mod tcp_ipv4_server_and_client_test {
        #[cfg(target_arch="x86_64")]
        mod impl64 {
            #[test]
            fn test_gl_tcp_server_and_client_ipv4() unsafe {
                impl_gl_tcp_ipv4_server_and_client();
            }
1252
            #[test]
1253 1254 1255
            fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
                impl_gl_tcp_ipv4_client_error_connection_refused();
            }
1256 1257 1258 1259 1260 1261 1262
            #[test]
            fn test_gl_tcp_server_address_in_use() unsafe {
                impl_gl_tcp_ipv4_server_address_in_use();
            }
            #[test]
            fn test_gl_tcp_server_access_denied() unsafe {
                impl_gl_tcp_ipv4_server_access_denied();
1263
            }
1264 1265 1266 1267
            #[test]
            fn test_gl_tcp_ipv4_server_client_reader_writer() {
                impl_gl_tcp_ipv4_server_client_reader_writer();
            }
1268

1269 1270 1271 1272 1273 1274 1275 1276
        }
        #[cfg(target_arch="x86")]
        mod impl32 {
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_server_and_client_ipv4() unsafe {
                impl_gl_tcp_ipv4_server_and_client();
            }
1277 1278
            #[test]
            #[ignore(cfg(target_os = "linux"))]
1279 1280 1281
            fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
                impl_gl_tcp_ipv4_client_error_connection_refused();
            }
1282 1283 1284 1285 1286 1287 1288
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_server_address_in_use() unsafe {
                impl_gl_tcp_ipv4_server_address_in_use();
            }
            #[test]
            #[ignore(cfg(target_os = "linux"))]
1289
            #[ignore(cfg(windows), reason = "deadlocking bots")]
1290 1291 1292
            fn test_gl_tcp_server_access_denied() unsafe {
                impl_gl_tcp_ipv4_server_access_denied();
            }
1293 1294 1295 1296
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_ipv4_server_client_reader_writer() {
                impl_gl_tcp_ipv4_server_client_reader_writer();
1297
            }
1298
        }
1299
    }
1300
    fn impl_gl_tcp_ipv4_server_and_client() {
1301
        let hl_loop = uv::global_loop::get();
1302 1303
        let server_ip = "127.0.0.1";
        let server_port = 8888u;
1304 1305 1306 1307 1308
        let expected_req = "ping";
        let expected_resp = "pong";

        let server_result_po = comm::port::<str>();
        let server_result_ch = comm::chan(server_result_po);
1309 1310 1311

        let cont_po = comm::port::<()>();
        let cont_ch = comm::chan(cont_po);
1312
        // server
1313
        do task::spawn_sched(task::manual_threads(1u)) {
B
Brian Anderson 已提交
1314
            let actual_req = do comm::listen |server_ch| {
1315 1316 1317 1318
                run_tcp_test_server(
                    server_ip,
                    server_port,
                    expected_resp,
1319
                    server_ch,
1320 1321
                    cont_ch,
                    hl_loop)
1322 1323 1324
            };
            server_result_ch.send(actual_req);
        };
1325
        comm::recv(cont_po);
1326 1327
        // client
        log(debug, "server started, firing up client..");
1328
        let actual_resp_result = do comm::listen |client_ch| {
1329 1330 1331 1332
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
1333 1334
                client_ch,
                hl_loop)
1335
        };
1336
        assert actual_resp_result.is_ok();
1337
        let actual_resp = actual_resp_result.get();
1338 1339 1340 1341 1342 1343 1344
        let actual_req = comm::recv(server_result_po);
        log(debug, #fmt("REQ: expected: '%s' actual: '%s'",
                       expected_req, actual_req));
        log(debug, #fmt("RESP: expected: '%s' actual: '%s'",
                       expected_resp, actual_resp));
        assert str::contains(actual_req, expected_req);
        assert str::contains(actual_resp, expected_resp);
1345
    }
1346
    fn impl_gl_tcp_ipv4_client_error_connection_refused() {
1347
        let hl_loop = uv::global_loop::get();
1348 1349 1350
        let server_ip = "127.0.0.1";
        let server_port = 8889u;
        let expected_req = "ping";
1351 1352
        // client
        log(debug, "firing up client..");
1353
        let actual_resp_result = do comm::listen |client_ch| {
1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
                client_ch,
                hl_loop)
        };
        alt actual_resp_result.get_err() {
          connection_refused {
          }
          _ {
            fail "unknown error.. expected connection_refused"
          }
        }
    }
1369 1370 1371 1372 1373
    fn impl_gl_tcp_ipv4_server_address_in_use() {
        let hl_loop = uv::global_loop::get();
        let server_ip = "127.0.0.1";
        let server_port = 8890u;
        let expected_req = "ping";
1374 1375 1376 1377 1378 1379 1380 1381
        let expected_resp = "pong";

        let server_result_po = comm::port::<str>();
        let server_result_ch = comm::chan(server_result_po);

        let cont_po = comm::port::<()>();
        let cont_ch = comm::chan(cont_po);
        // server
1382
        do task::spawn_sched(task::manual_threads(1u)) {
B
Brian Anderson 已提交
1383
            let actual_req = do comm::listen |server_ch| {
1384
                run_tcp_test_server(
1385 1386 1387 1388
                    server_ip,
                    server_port,
                    expected_resp,
                    server_ch,
1389 1390
                    cont_ch,
                    hl_loop)
1391 1392 1393 1394
            };
            server_result_ch.send(actual_req);
        };
        comm::recv(cont_po);
1395 1396 1397 1398 1399 1400
        // this one should fail..
        let listen_err = run_tcp_test_server_fail(
                            server_ip,
                            server_port,
                            hl_loop);
        // client.. just doing this so that the first server tears down
1401
        log(debug, "server started, firing up client..");
1402
        do comm::listen |client_ch| {
1403 1404 1405 1406
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
1407 1408
                client_ch,
                hl_loop)
1409
        };
1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438
        alt listen_err {
          address_in_use {
            assert true;
          }
          _ {
            fail "expected address_in_use listen error,"+
                      "but got a different error varient. check logs.";
          }
        }
    }
    fn impl_gl_tcp_ipv4_server_access_denied() {
        let hl_loop = uv::global_loop::get();
        let server_ip = "127.0.0.1";
        let server_port = 80u;
        // this one should fail..
        let listen_err = run_tcp_test_server_fail(
                            server_ip,
                            server_port,
                            hl_loop);
        alt listen_err {
          access_denied {
            assert true;
          }
          _ {
            fail "expected address_in_use listen error,"+
                      "but got a different error varient. check logs.";
          }
        }
    }
1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451
    fn impl_gl_tcp_ipv4_server_client_reader_writer() {
        let iotask = uv::global_loop::get();
        let server_ip = "127.0.0.1";
        let server_port = 8891u;
        let expected_req = "ping";
        let expected_resp = "pong";

        let server_result_po = comm::port::<str>();
        let server_result_ch = comm::chan(server_result_po);

        let cont_po = comm::port::<()>();
        let cont_ch = comm::chan(cont_po);
        // server
1452
        do task::spawn_sched(task::manual_threads(1u)) {
1453
            let actual_req = do comm::listen |server_ch| {
1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467
                run_tcp_test_server(
                    server_ip,
                    server_port,
                    expected_resp,
                    server_ch,
                    cont_ch,
                    iotask)
            };
            server_result_ch.send(actual_req);
        };
        comm::recv(cont_po);
        // client
        let server_addr = ip::v4::parse_addr(server_ip);
        let conn_result = connect(server_addr, server_port, iotask);
1468
        if result::is_err(conn_result) {
1469 1470 1471 1472 1473 1474
            assert false;
        }
        let sock_buf = @socket_buf(result::unwrap(conn_result));
        buf_write(sock_buf as io::writer, expected_req);

        // so contrived!
1475
        let actual_resp = do str::as_bytes(expected_resp) |resp_buf| {
1476 1477 1478
            buf_read(sock_buf as io::reader,
                     vec::len(resp_buf))
        };
1479

1480 1481 1482 1483 1484 1485 1486 1487
        let actual_req = comm::recv(server_result_po);
        log(debug, #fmt("REQ: expected: '%s' actual: '%s'",
                       expected_req, actual_req));
        log(debug, #fmt("RESP: expected: '%s' actual: '%s'",
                       expected_resp, actual_resp));
        assert str::contains(actual_req, expected_req);
        assert str::contains(actual_resp, expected_resp);
    }
1488

1489 1490
    fn buf_write(+w: io::writer, val: str) {
        log(debug, #fmt("BUF_WRITE: val len %?", str::len(val)));
1491
        do str::byte_slice(val) |b_slice| {
1492 1493 1494
            log(debug, #fmt("BUF_WRITE: b_slice len %?",
                            vec::len(b_slice)));
            w.write(b_slice)
1495 1496 1497 1498 1499 1500 1501 1502 1503
        }
    }

    fn buf_read(+r: io::reader, len: uint) -> str {
        let new_bytes = r.read_bytes(len);
        log(debug, #fmt("in buf_read.. new_bytes len: %?",
                        vec::len(new_bytes)));
        str::from_bytes(new_bytes)
    }
1504

1505
    fn run_tcp_test_server(server_ip: str, server_port: uint, resp: str,
1506
                          server_ch: comm::chan<str>,
1507
                          cont_ch: comm::chan<()>,
1508
                          iotask: iotask) -> str {
1509 1510 1511
        let server_ip_addr = ip::v4::parse_addr(server_ip);
        let listen_result = listen(server_ip_addr, server_port, 128u, iotask,
            // on_establish_cb -- called when listener is set up
1512
            |kill_ch| {
1513 1514 1515 1516 1517 1518
                log(debug, #fmt("establish_cb %?",
                    kill_ch));
                comm::send(cont_ch, ());
            },
            // risky to run this on the loop, but some users
            // will want the POWER
1519
            |new_conn, kill_ch| {
1520
            log(debug, "SERVER: new connection!");
1521
            do comm::listen |cont_ch| {
1522
                do task::spawn_sched(task::manual_threads(1u)) {
1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544
                    log(debug, "SERVER: starting worker for new req");

                    let accept_result = accept(new_conn);
                    log(debug, "SERVER: after accept()");
                    if result::is_err(accept_result) {
                        log(debug, "SERVER: error accept connection");
                        let err_data = result::get_err(accept_result);
                        comm::send(kill_ch, some(err_data));
                        log(debug,
                            "SERVER/WORKER: send on err cont ch");
                        cont_ch.send(());
                    }
                    else {
                        log(debug,
                            "SERVER/WORKER: send on cont ch");
                        cont_ch.send(());
                        let sock = result::unwrap(accept_result);
                        log(debug, "SERVER: successfully accepted"+
                            "connection!");
                        let received_req_bytes = read(sock, 0u);
                        alt received_req_bytes {
                          result::ok(data) {
1545 1546 1547
                            log(debug, "SERVER: got REQ str::from_bytes..");
                            log(debug, #fmt("SERVER: REQ data len: %?",
                                            vec::len(data)));
1548 1549 1550 1551 1552 1553 1554 1555 1556 1557
                            server_ch.send(
                                str::from_bytes(data));
                            log(debug, "SERVER: before write");
                            tcp_write_single(sock, str::bytes(resp));
                            log(debug, "SERVER: after write.. die");
                            comm::send(kill_ch, none);
                          }
                          result::err(err_data) {
                            log(debug, #fmt("SERVER: error recvd: %s %s",
                                err_data.err_name, err_data.err_msg));
1558
                            comm::send(kill_ch, some(err_data));
1559 1560
                            server_ch.send("");
                          }
1561
                        }
1562
                        log(debug, "SERVER: worker spinning down");
1563
                    }
1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575
                }
                log(debug, "SERVER: waiting to recv on cont_ch");
                cont_ch.recv()
            };
            log(debug, "SERVER: recv'd on cont_ch..leaving listen cb");
        });
        // err check on listen_result
        if result::is_err(listen_result) {
            alt result::get_err(listen_result) {
              generic_listen_err(name, msg) {
                fail #fmt("SERVER: exited abnormally name %s msg %s",
                                name, msg);
1576
              }
1577 1578 1579 1580 1581
              access_denied {
                fail "SERVER: exited abnormally, got access denied..";
              }
              address_in_use {
                fail "SERVER: exited abnormally, got address in use...";
1582
              }
1583
            }
1584
        }
1585 1586 1587 1588 1589
        let ret_val = server_ch.recv();
        log(debug, #fmt("SERVER: exited and got ret val: '%s'", ret_val));
        ret_val
    }

1590 1591 1592 1593 1594
    fn run_tcp_test_server_fail(server_ip: str, server_port: uint,
                          iotask: iotask) -> tcp_listen_err_data {
        let server_ip_addr = ip::v4::parse_addr(server_ip);
        let listen_result = listen(server_ip_addr, server_port, 128u, iotask,
            // on_establish_cb -- called when listener is set up
1595
            |kill_ch| {
1596 1597 1598
                log(debug, #fmt("establish_cb %?",
                    kill_ch));
            },
1599
            |new_conn, kill_ch| {
1600 1601 1602 1603
                fail #fmt("SERVER: shouldn't be called.. %? %?",
                           new_conn, kill_ch);
        });
        // err check on listen_result
1604
        if result::is_err(listen_result) {
1605 1606 1607 1608 1609 1610 1611
            result::get_err(listen_result)
        }
        else {
            fail "SERVER: did not fail as expected"
        }
    }

1612
    fn run_tcp_test_client(server_ip: str, server_port: uint, resp: str,
1613
                          client_ch: comm::chan<str>,
1614 1615
                          iotask: iotask) -> result::result<str,
                                                    tcp_connect_err_data> {
1616
        let server_ip_addr = ip::v4::parse_addr(server_ip);
1617

1618
        log(debug, "CLIENT: starting..");
1619
        let connect_result = connect(server_ip_addr, server_port, iotask);
1620
        if result::is_err(connect_result) {
1621
            log(debug, "CLIENT: failed to connect");
1622
            let err_data = result::get_err(connect_result);
1623
            err(err_data)
1624
        }
1625 1626 1627 1628
        else {
            let sock = result::unwrap(connect_result);
            let resp_bytes = str::bytes(resp);
            tcp_write_single(sock, resp_bytes);
1629
            let read_result = sock.read(0u);
1630
            if read_result.is_err() {
1631
                log(debug, "CLIENT: failure to read");
1632
                ok("")
1633
            }
1634 1635 1636 1637 1638
            else {
                client_ch.send(str::from_bytes(read_result.get()));
                let ret_val = client_ch.recv();
                log(debug, #fmt("CLIENT: after client_ch recv ret: '%s'",
                   ret_val));
1639
                ok(ret_val)
1640 1641
            }
        }
1642 1643
    }

1644
    fn tcp_write_single(sock: tcp_socket, val: ~[u8]) {
1645 1646
        let write_result_future = sock.write_future(val);
        let write_result = write_result_future.get();
1647
        if result::is_err(write_result) {
1648
            log(debug, "tcp_write_single: write failed!");
1649
            let err_data = result::get_err(write_result);
1650 1651 1652 1653
            log(debug, #fmt("tcp_write_single err name: %s msg: %s",
                err_data.err_name, err_data.err_msg));
            // meh. torn on what to do here.
            fail "tcp_write_single failed";
1654
        }
1655
    }
1656
}