net_tcp.rs 59.3 KB
Newer Older
1
//! High-level interface to libuv's TCP functionality
2

J
Jeff Olson 已提交
3
import ip = net_ip;
4
import uv::iotask;
B
Brian Anderson 已提交
5
import uv::iotask::IoTask;
6
import future_spawn = future::spawn;
J
Jeff Olson 已提交
7 8
// FIXME #1935
// should be able to, but can't atm, replace w/ result::{result, extensions};
9 10
import result::*;
import libc::size_t;
11
import io::{Reader, ReaderUtil, Writer};
E
Eric Holk 已提交
12
import comm = core::comm;
J
Jeff Olson 已提交
13

14
// tcp interfaces
15
export tcp_socket;
16 17
// buffered socket
export tcp_socket_buf, socket_buf;
18 19
// errors
export tcp_err_data, tcp_connect_err_data;
20
// operations on a tcp_socket
21
export write, write_future, read_start, read_stop;
22
// tcp server stuff
23
export listen, accept;
24 25
// tcp client stuff
export connect;
J
Jeff Olson 已提交
26

27
#[nolink]
28
extern mod rustrt {
29
    fn rust_uv_current_kernel_malloc(size: libc::c_uint) -> *libc::c_void;
30
    fn rust_uv_current_kernel_free(mem: *libc::c_void);
31
    fn rust_uv_helper_uv_tcp_t_size() -> libc::c_uint;
32 33
}

34 35 36 37 38 39 40
/**
 * Encapsulates an open TCP/IP connection through libuv
 *
 * `tcp_socket` is non-copyable/sendable and automagically handles closing the
 * underlying libuv data structures when it goes out of scope. This is the
 * data structure that is used for read/write operations over a TCP stream.
 */
B
Brian Anderson 已提交
41
struct tcp_socket {
42 43 44
  let socket_data: @tcp_socket_data;
  new(socket_data: @tcp_socket_data) { self.socket_data = socket_data; }
  drop {
45
    unsafe {
46
        tear_down_socket_data(self.socket_data)
47 48
    }
  }
J
Jeff Olson 已提交
49 50
}

51 52 53 54
/**
 * A buffered wrapper for `net::tcp::tcp_socket`
 *
 * It is created with a call to `net::tcp::socket_buf()` and has impls that
55
 * satisfy both the `io::reader` and `io::writer` traits.
56
 */
B
Brian Anderson 已提交
57
struct tcp_socket_buf {
58 59
  let data: @tcp_buffered_socket_data;
  new(data: @tcp_buffered_socket_data) { self.data = data; }
60 61
}

62
/// Contains raw, string-based, error information returned from libuv
63
type tcp_err_data = {
64 65
    err_name: ~str,
    err_msg: ~str
66
};
67
/// Details returned as part of a `result::err` result from `tcp::listen`
68
enum tcp_listen_err_data {
69 70 71 72
    /**
     * Some unplanned-for error. The first and second fields correspond
     * to libuv's `err_name` and `err_msg` fields, respectively.
     */
73
    generic_listen_err(~str, ~str),
74 75 76 77 78 79 80
    /**
     * Failed to bind to the requested IP/Port, because it is already in use.
     *
     * # Possible Causes
     *
     * * Attempting to bind to a port already bound to another listener
     */
81
    address_in_use,
82 83 84 85 86 87 88 89 90 91
    /**
     * Request to bind to an IP/Port was denied by the system.
     *
     * # Possible Causes
     *
     * * Attemping to binding to an IP/Port as a non-Administrator
     *   on Windows Vista+
     * * Attempting to bind, as a non-priv'd
     *   user, to 'privileged' ports (< 1024) on *nix
     */
92 93
    access_denied
}
94
/// Details returned as part of a `result::err` result from `tcp::connect`
95
enum tcp_connect_err_data {
96 97 98 99
    /**
     * Some unplanned-for error. The first and second fields correspond
     * to libuv's `err_name` and `err_msg` fields, respectively.
     */
100
    generic_connect_err(~str, ~str),
101
    /// Invalid IP or invalid port
102 103
    connection_refused
}
104

105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
/**
 * Initiate a client connection over TCP/IP
 *
 * # Arguments
 *
 * * `input_ip` - The IP address (versions 4 or 6) of the remote host
 * * `port` - the unsigned integer of the desired remote host port
 * * `iotask` - a `uv::iotask` that the tcp request will run on
 *
 * # Returns
 *
 * A `result` that, if the operation succeeds, contains a
 * `net::net::tcp_socket` that can be used to send and receive data to/from
 * the remote host. In the event of failure, a
 * `net::tcp::tcp_connect_err_data` instance will be returned
 */
121
fn connect(-input_ip: ip::ip_addr, port: uint,
B
Brian Anderson 已提交
122
           iotask: IoTask)
123
    -> result::Result<tcp_socket, tcp_connect_err_data> unsafe {
124 125
    let result_po = core::comm::Port::<conn_attempt>();
    let closed_signal_po = core::comm::Port::<()>();
J
Jeff Olson 已提交
126
    let conn_data = {
127 128
        result_ch: core::comm::Chan(result_po),
        closed_signal_ch: core::comm::Chan(closed_signal_po)
J
Jeff Olson 已提交
129 130
    };
    let conn_data_ptr = ptr::addr_of(conn_data);
131
    let reader_po = core::comm::Port::<result::Result<~[u8], tcp_err_data>>();
132 133 134
    let stream_handle_ptr = malloc_uv_tcp_t();
    *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
    let socket_data = @{
135
        reader_po: reader_po,
136
        reader_ch: core::comm::Chan(reader_po),
137 138 139
        stream_handle_ptr: stream_handle_ptr,
        connect_req: uv::ll::connect_t(),
        write_req: uv::ll::write_t(),
140
        iotask: iotask
J
Jeff Olson 已提交
141
    };
142
    let socket_data_ptr = ptr::addr_of(*socket_data);
P
Paul Stansifer 已提交
143
    log(debug, fmt!("tcp_connect result_ch %?", conn_data.result_ch));
J
Jeff Olson 已提交
144 145
    // get an unsafe representation of our stream_handle_ptr that
    // we can send into the interact cb to be handled in libuv..
P
Paul Stansifer 已提交
146 147
    log(debug, fmt!("stream_handle_ptr outside interact %?",
        stream_handle_ptr));
148
    do iotask::interact(iotask) |loop_ptr| unsafe {
149
        log(debug, ~"in interact cb for tcp client connect..");
P
Paul Stansifer 已提交
150 151
        log(debug, fmt!("stream_handle_ptr in interact %?",
            stream_handle_ptr));
152
        match uv::ll::tcp_init( loop_ptr, stream_handle_ptr) {
B
Brian Anderson 已提交
153
          0i32 => {
154
            log(debug, ~"tcp_init successful");
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
            log(debug, ~"dealing w/ ipv4 connection..");
            let connect_req_ptr =
                ptr::addr_of((*socket_data_ptr).connect_req);
            let addr_str = ip::format_addr(input_ip);
            let connect_result = match input_ip {
              ip::ipv4(addr) => {
                // have to "recreate" the sockaddr_in/6
                // since the ip_addr discards the port
                // info.. should probably add an additional
                // rust type that actually is closer to
                // what the libuv API expects (ip str + port num)
                log(debug, fmt!("addr: %?", addr));
                let in_addr = uv::ll::ip4_addr(addr_str, port as int);
                uv::ll::tcp_connect(
                    connect_req_ptr,
                    stream_handle_ptr,
                    ptr::addr_of(in_addr),
                    tcp_connect_on_connect_cb)
              }
              ip::ipv6(addr) => {
                log(debug, fmt!("addr: %?", addr));
                let in_addr = uv::ll::ip6_addr(addr_str, port as int);
                uv::ll::tcp_connect6(
                    connect_req_ptr,
                    stream_handle_ptr,
                    ptr::addr_of(in_addr),
                    tcp_connect_on_connect_cb)
              }
            };
            match connect_result {
              0i32 => {
                log(debug, ~"tcp_connect successful");
                // reusable data that we'll have for the
                // duration..
                uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                           socket_data_ptr as
                                              *libc::c_void);
                // just so the connect_cb can send the
                // outcome..
                uv::ll::set_data_for_req(connect_req_ptr,
                                         conn_data_ptr);
                log(debug, ~"leaving tcp_connect interact cb...");
                // let tcp_connect_on_connect_cb send on
                // the result_ch, now..
              }
              _ => {
                // immediate connect failure.. probably a garbage
                // ip or somesuch
                let err_data = uv::ll::get_last_err_data(loop_ptr);
                core::comm::send((*conn_data_ptr).result_ch,
                           conn_failure(err_data.to_tcp_err()));
                uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                               conn_data_ptr);
                uv::ll::close(stream_handle_ptr, stream_error_close_cb);
J
Jeff Olson 已提交
209 210 211
              }
            }
        }
B
Brian Anderson 已提交
212
          _ => {
J
Jeff Olson 已提交
213 214
            // failure to create a tcp handle
            let err_data = uv::ll::get_last_err_data(loop_ptr);
E
Eric Holk 已提交
215
            core::comm::send((*conn_data_ptr).result_ch,
216
                       conn_failure(err_data.to_tcp_err()));
J
Jeff Olson 已提交
217 218 219
          }
        }
    };
E
Eric Holk 已提交
220
    match core::comm::recv(result_po) {
B
Brian Anderson 已提交
221
      conn_success => {
222
        log(debug, ~"tcp::connect - received success on result_po");
223
        result::Ok(tcp_socket(socket_data))
J
Jeff Olson 已提交
224
      }
B
Brian Anderson 已提交
225
      conn_failure(err_data) => {
E
Eric Holk 已提交
226
        core::comm::recv(closed_signal_po);
227
        log(debug, ~"tcp::connect - received failure on result_po");
228 229 230
        // still have to free the malloc'd stream handle..
        rustrt::rust_uv_current_kernel_free(stream_handle_ptr
                                           as *libc::c_void);
231
        let tcp_conn_err = match err_data.err_name {
B
Brian Anderson 已提交
232 233
          ~"ECONNREFUSED" => connection_refused,
          _ => generic_connect_err(err_data.err_name, err_data.err_msg)
234
        };
235
        result::Err(tcp_conn_err)
J
Jeff Olson 已提交
236 237 238
      }
    }
}
239

240 241 242 243 244 245
/**
 * Write binary data to a tcp stream; Blocks until operation completes
 *
 * # Arguments
 *
 * * sock - a `tcp_socket` to write to
246
 * * raw_write_data - a vector of `~[u8]` that will be written to the stream.
247 248 249 250 251 252 253
 * This value must remain valid for the duration of the `write` call
 *
 * # Returns
 *
 * A `result` object with a `nil` value as the `ok` variant, or a
 * `tcp_err_data` value as the `err` variant
 */
254
fn write(sock: tcp_socket, raw_write_data: ~[u8])
255
    -> result::Result<(), tcp_err_data> unsafe {
256
    let socket_data_ptr = ptr::addr_of(*(sock.socket_data));
257 258 259
    write_common_impl(socket_data_ptr, raw_write_data)
}

260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
/**
 * Write binary data to tcp stream; Returns a `future::future` value
 * immediately
 *
 * # Safety
 *
 * This function can produce unsafe results if:
 *
 * 1. the call to `write_future` is made
 * 2. the `future::future` value returned is never resolved via
 * `future::get`
 * 3. and then the `tcp_socket` passed in to `write_future` leaves
 * scope and is destructed before the task that runs the libuv write
 * operation completes.
 *
 * As such: If using `write_future`, always be sure to resolve the returned
 * `future` so as to ensure libuv doesn't try to access a released write
 * handle. Otherwise, use the blocking `tcp::write` function instead.
 *
 * # Arguments
 *
 * * sock - a `tcp_socket` to write to
282
 * * raw_write_data - a vector of `~[u8]` that will be written to the stream.
283 284 285 286 287 288 289 290
 * This value must remain valid for the duration of the `write` call
 *
 * # Returns
 *
 * A `future` value that, once the `write` operation completes, resolves to a
 * `result` object with a `nil` value as the `ok` variant, or a `tcp_err_data`
 * value as the `err` variant
 */
291
fn write_future(sock: tcp_socket, raw_write_data: ~[u8])
292
    -> future::Future<result::Result<(), tcp_err_data>> unsafe {
293
    let socket_data_ptr = ptr::addr_of(*(sock.socket_data));
294
    do future_spawn {
295
        let data_copy = copy(raw_write_data);
296
        write_common_impl(socket_data_ptr, data_copy)
297
    }
298 299
}

300 301 302 303 304 305 306 307 308 309 310
/**
 * Begin reading binary data from an open TCP connection; used with
 * `read_stop`
 *
 * # Arguments
 *
 * * sock -- a `net::tcp::tcp_socket` for the connection to read from
 *
 * # Returns
 *
 * * A `result` instance that will either contain a
E
Eric Holk 已提交
311 312 313
 * `core::comm::port<tcp_read_result>` that the user can read (and
 * optionally, loop on) from until `read_stop` is called, or a
 * `tcp_err_data` record
314
 */
315
fn read_start(sock: tcp_socket)
316 317
    -> result::Result<comm::Port<
        result::Result<~[u8], tcp_err_data>>, tcp_err_data> unsafe {
318
    let socket_data = ptr::addr_of(*(sock.socket_data));
319
    read_start_common_impl(socket_data)
320
}
321

322 323 324 325 326 327 328
/**
 * Stop reading from an open TCP connection; used with `read_start`
 *
 * # Arguments
 *
 * * `sock` - a `net::tcp::tcp_socket` that you wish to stop reading on
 */
329
fn read_stop(sock: tcp_socket,
330 331
             -read_port: comm::Port<result::Result<~[u8], tcp_err_data>>) ->
    result::Result<(), tcp_err_data> unsafe {
P
Paul Stansifer 已提交
332
    log(debug, fmt!("taking the read_port out of commission %?", read_port));
333
    let socket_data = ptr::addr_of(*sock.socket_data);
334 335 336
    read_stop_common_impl(socket_data)
}

337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
/**
 * Reads a single chunk of data from `tcp_socket`; block until data/error
 * recv'd
 *
 * Does a blocking read operation for a single chunk of data from a
 * `tcp_socket` until a data arrives or an error is received. The provided
 * `timeout_msecs` value is used to raise an error if the timeout period
 * passes without any data received.
 *
 * # Arguments
 *
 * * `sock` - a `net::tcp::tcp_socket` that you wish to read from
 * * `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
 * read attempt. Pass `0u` to wait indefinitely
 */
352
fn read(sock: tcp_socket, timeout_msecs: uint)
353
    -> result::Result<~[u8],tcp_err_data> {
354
    let socket_data = ptr::addr_of(*(sock.socket_data));
355 356 357
    read_common_impl(socket_data, timeout_msecs)
}

358
/**
359
 * Reads a single chunk of data; returns a `future::future<~[u8]>`
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386
 * immediately
 *
 * Does a non-blocking read operation for a single chunk of data from a
 * `tcp_socket` and immediately returns a `future` value representing the
 * result. When resolving the returned `future`, it will block until data
 * arrives or an error is received. The provided `timeout_msecs`
 * value is used to raise an error if the timeout period passes without any
 * data received.
 *
 * # Safety
 *
 * This function can produce unsafe results if the call to `read_future` is
 * made, the `future::future` value returned is never resolved via
 * `future::get`, and then the `tcp_socket` passed in to `read_future` leaves
 * scope and is destructed before the task that runs the libuv read
 * operation completes.
 *
 * As such: If using `read_future`, always be sure to resolve the returned
 * `future` so as to ensure libuv doesn't try to access a released read
 * handle. Otherwise, use the blocking `tcp::read` function instead.
 *
 * # Arguments
 *
 * * `sock` - a `net::tcp::tcp_socket` that you wish to read from
 * * `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
 * read attempt. Pass `0u` to wait indefinitely
 */
387
fn read_future(sock: tcp_socket, timeout_msecs: uint)
388
    -> future::Future<result::Result<~[u8],tcp_err_data>> {
389
    let socket_data = ptr::addr_of(*(sock.socket_data));
390
    do future_spawn {
391
        read_common_impl(socket_data, timeout_msecs)
392
    }
393
}
394

395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428
/**
 * Bind an incoming client connection to a `net::tcp::tcp_socket`
 *
 * # Notes
 *
 * It is safe to call `net::tcp::accept` _only_ within the context of the
 * `new_connect_cb` callback provided as the final argument to the
 * `net::tcp::listen` function.
 *
 * The `new_conn` opaque value is provided _only_ as the first argument to the
 * `new_connect_cb` provided as a part of `net::tcp::listen`.
 * It can be safely sent to another task but it _must_ be
 * used (via `net::tcp::accept`) before the `new_connect_cb` call it was
 * provided to returns.
 *
 * This implies that a port/chan pair must be used to make sure that the
 * `new_connect_cb` call blocks until an attempt to create a
 * `net::tcp::tcp_socket` is completed.
 *
 * # Example
 *
 * Here, the `new_conn` is used in conjunction with `accept` from within
 * a task spawned by the `new_connect_cb` passed into `listen`
 *
 * ~~~~~~~~~~~
 * net::tcp::listen(remote_ip, remote_port, backlog)
 *     // this callback is ran once after the connection is successfully
 *     // set up
 *     {|kill_ch|
 *       // pass the kill_ch to your main loop or wherever you want
 *       // to be able to externally kill the server from
 *     }
 *     // this callback is ran when a new connection arrives
 *     {|new_conn, kill_ch|
E
Eric Holk 已提交
429 430
 *     let cont_po = core::comm::port::<option<tcp_err_data>>();
 *     let cont_ch = core::comm::chan(cont_po);
431 432 433
 *     task::spawn {||
 *         let accept_result = net::tcp::accept(new_conn);
 *         if accept_result.is_err() {
E
Eric Holk 已提交
434
 *             core::comm::send(cont_ch, result::get_err(accept_result));
435 436 437 438
 *             // fail?
 *         }
 *         else {
 *             let sock = result::get(accept_result);
E
Eric Holk 已提交
439
 *             core::comm::send(cont_ch, true);
440 441 442
 *             // do work here
 *         }
 *     };
E
Eric Holk 已提交
443
 *     match core::comm::recv(cont_po) {
444
 *       // shut down listen()
E
Eric Holk 已提交
445
 *       some(err_data) { core::comm::send(kill_chan, some(err_data)) }
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463
 *       // wait for next connection
 *       none {}
 *     }
 * };
 * ~~~~~~~~~~~
 *
 * # Arguments
 *
 * * `new_conn` - an opaque value used to create a new `tcp_socket`
 *
 * # Returns
 *
 * On success, this function will return a `net::tcp::tcp_socket` as the
 * `ok` variant of a `result`. The `net::tcp::tcp_socket` is anchored within
 * the task that `accept` was called within for its lifetime. On failure,
 * this function will return a `net::tcp::tcp_err_data` record
 * as the `err` variant of a `result`.
 */
464
fn accept(new_conn: tcp_new_connection)
465
    -> result::Result<tcp_socket, tcp_err_data> unsafe {
466

467
    match new_conn{
B
Brian Anderson 已提交
468
      new_tcp_conn(server_handle_ptr) => {
469
        let server_data_ptr = uv::ll::get_data_for_uv_handle(
470
            server_handle_ptr) as *tcp_listen_fc_data;
471
        let reader_po = core::comm::Port();
472
        let iotask = (*server_data_ptr).iotask;
473 474 475
        let stream_handle_ptr = malloc_uv_tcp_t();
        *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
        let client_socket_data = @{
476
            reader_po: reader_po,
477
            reader_ch: core::comm::Chan(reader_po),
478
            stream_handle_ptr : stream_handle_ptr,
479 480
            connect_req : uv::ll::connect_t(),
            write_req : uv::ll::write_t(),
481
            iotask : iotask
482
        };
483 484 485
        let client_socket_data_ptr = ptr::addr_of(*client_socket_data);
        let client_stream_handle_ptr =
            (*client_socket_data_ptr).stream_handle_ptr;
486

487 488
        let result_po = core::comm::Port::<Option<tcp_err_data>>();
        let result_ch = core::comm::Chan(result_po);
489 490 491 492 493 494 495

        // UNSAFE LIBUV INTERACTION BEGIN
        // .. normally this happens within the context of
        // a call to uv::hl::interact.. but we're breaking
        // the rules here because this always has to be
        // called within the context of a listen() new_connect_cb
        // callback (or it will likely fail and drown your cat)
496
        log(debug, ~"in interact cb for tcp::accept");
497 498
        let loop_ptr = uv::ll::get_loop_for_uv_handle(
            server_handle_ptr);
499
        match uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
B
Brian Anderson 已提交
500
          0i32 => {
501
            log(debug, ~"uv_tcp_init successful for client stream");
502
            match uv::ll::accept(
503 504
                server_handle_ptr as *libc::c_void,
                client_stream_handle_ptr as *libc::c_void) {
B
Brian Anderson 已提交
505
              0i32 => {
506
                log(debug, ~"successfully accepted client connection");
507
                uv::ll::set_data_for_uv_handle(client_stream_handle_ptr,
508 509
                                               client_socket_data_ptr
                                                   as *libc::c_void);
B
Brian Anderson 已提交
510
                core::comm::send(result_ch, None);
511
              }
B
Brian Anderson 已提交
512
              _ => {
513
                log(debug, ~"failed to accept client conn");
B
Brian Anderson 已提交
514
                core::comm::send(result_ch, Some(
515 516 517
                    uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
              }
            }
518
          }
B
Brian Anderson 已提交
519
          _ => {
520
            log(debug, ~"failed to init client stream");
B
Brian Anderson 已提交
521
            core::comm::send(result_ch, Some(
522 523 524 525
                uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
          }
        }
        // UNSAFE LIBUV INTERACTION END
E
Eric Holk 已提交
526
        match core::comm::recv(result_po) {
527 528
          Some(err_data) => result::Err(err_data),
          None => result::Ok(tcp_socket(client_socket_data))
529 530 531 532 533
        }
      }
    }
}

534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551
/**
 * Bind to a given IP/port and listen for new connections
 *
 * # Arguments
 *
 * * `host_ip` - a `net::ip::ip_addr` representing a unique IP
 * (versions 4 or 6)
 * * `port` - a uint representing the port to listen on
 * * `backlog` - a uint representing the number of incoming connections
 * to cache in memory
 * * `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on
 * * `on_establish_cb` - a callback that is evaluated if/when the listener
 * is successfully established. it takes no parameters
 * * `new_connect_cb` - a callback to be evaluated, on the libuv thread,
 * whenever a client attempts to conect on the provided ip/port. the
 * callback's arguments are:
 *     * `new_conn` - an opaque type that can be passed to
 *     `net::tcp::accept` in order to be converted to a `tcp_socket`.
E
Eric Holk 已提交
552 553
 *     * `kill_ch` - channel of type `core::comm::chan<option<tcp_err_data>>`.
 *     this channel can be used to send a message to cause `listen` to begin
554 555 556 557 558 559 560 561
 *     closing the underlying libuv data structures.
 *
 * # returns
 *
 * a `result` instance containing empty data of type `()` on a
 * successful/normal shutdown, and a `tcp_listen_err_data` enum in the event
 * of listen exiting because of an error
 */
562
fn listen(-host_ip: ip::ip_addr, port: uint, backlog: uint,
B
Brian Anderson 已提交
563
          iotask: IoTask,
B
Brian Anderson 已提交
564
          on_establish_cb: fn~(comm::Chan<Option<tcp_err_data>>),
565
          +new_connect_cb: fn~(tcp_new_connection,
B
Brian Anderson 已提交
566
                               comm::Chan<Option<tcp_err_data>>))
567
    -> result::Result<(), tcp_listen_err_data> unsafe {
568
    do listen_common(host_ip, port, backlog, iotask, on_establish_cb)
569
        // on_connect_cb
570
        |handle| unsafe {
571 572 573 574 575 576 577 578
            let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
                as *tcp_listen_fc_data;
            let new_conn = new_tcp_conn(handle);
            let kill_ch = (*server_data_ptr).kill_ch;
            new_connect_cb(new_conn, kill_ch);
    }
}

579
fn listen_common(-host_ip: ip::ip_addr, port: uint, backlog: uint,
B
Brian Anderson 已提交
580
          iotask: IoTask,
B
Brian Anderson 已提交
581
          on_establish_cb: fn~(comm::Chan<Option<tcp_err_data>>),
582
          -on_connect_cb: fn~(*uv::ll::uv_tcp_t))
583
    -> result::Result<(), tcp_listen_err_data> unsafe {
584 585 586
    let stream_closed_po = core::comm::Port::<()>();
    let kill_po = core::comm::Port::<Option<tcp_err_data>>();
    let kill_ch = core::comm::Chan(kill_po);
587 588 589 590
    let server_stream = uv::ll::tcp_t();
    let server_stream_ptr = ptr::addr_of(server_stream);
    let server_data = {
        server_stream_ptr: server_stream_ptr,
591
        stream_closed_ch: core::comm::Chan(stream_closed_po),
592
        kill_ch: kill_ch,
593
        on_connect_cb: on_connect_cb,
594
        iotask: iotask,
595 596 597 598
        mut active: true
    };
    let server_data_ptr = ptr::addr_of(server_data);

E
Eric Holk 已提交
599
    let setup_result = do core::comm::listen |setup_ch| {
J
Jeff Olson 已提交
600
        // this is to address a compiler warning about
601 602 603 604
        // an implicit copy.. it seems that double nested
        // will defeat a move sigil, as is done to the host_ip
        // arg above.. this same pattern works w/o complaint in
        // tcp::connect (because the iotask::interact cb isn't
E
Eric Holk 已提交
605
        // nested within a core::comm::listen block)
606
        let loc_ip = copy(host_ip);
607
        do iotask::interact(iotask) |loop_ptr| unsafe {
608
            match uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
B
Brian Anderson 已提交
609
              0i32 => {
610 611 612
                uv::ll::set_data_for_uv_handle(
                    server_stream_ptr,
                    server_data_ptr);
613
                let addr_str = ip::format_addr(loc_ip);
614
                let bind_result = match loc_ip {
B
Brian Anderson 已提交
615
                  ip::ipv4(addr) => {
P
Paul Stansifer 已提交
616
                    log(debug, fmt!("addr: %?", addr));
617 618 619 620
                    let in_addr = uv::ll::ip4_addr(addr_str, port as int);
                    uv::ll::tcp_bind(server_stream_ptr,
                                     ptr::addr_of(in_addr))
                  }
B
Brian Anderson 已提交
621
                  ip::ipv6(addr) => {
P
Paul Stansifer 已提交
622
                    log(debug, fmt!("addr: %?", addr));
623 624 625 626 627
                    let in_addr = uv::ll::ip6_addr(addr_str, port as int);
                    uv::ll::tcp_bind6(server_stream_ptr,
                                     ptr::addr_of(in_addr))
                  }
                };
628
                match bind_result {
B
Brian Anderson 已提交
629
                  0i32 => {
630
                    match uv::ll::listen(server_stream_ptr,
631 632
                                       backlog as libc::c_int,
                                       tcp_lfc_on_connection_cb) {
B
Brian Anderson 已提交
633
                      0i32 => core::comm::send(setup_ch, None),
B
Brian Anderson 已提交
634
                      _ => {
635
                        log(debug, ~"failure to uv_listen()");
636
                        let err_data = uv::ll::get_last_err_data(loop_ptr);
B
Brian Anderson 已提交
637
                        core::comm::send(setup_ch, Some(err_data));
638 639
                      }
                    }
640
                  }
B
Brian Anderson 已提交
641
                  _ => {
642
                    log(debug, ~"failure to uv_tcp_bind");
643
                    let err_data = uv::ll::get_last_err_data(loop_ptr);
B
Brian Anderson 已提交
644
                    core::comm::send(setup_ch, Some(err_data));
645 646 647
                  }
                }
              }
B
Brian Anderson 已提交
648
              _ => {
649
                log(debug, ~"failure to uv_tcp_init");
650
                let err_data = uv::ll::get_last_err_data(loop_ptr);
B
Brian Anderson 已提交
651
                core::comm::send(setup_ch, Some(err_data));
652 653
              }
            }
654 655
        };
        setup_ch.recv()
656
    };
657
    match setup_result {
B
Brian Anderson 已提交
658
      Some(err_data) => {
659
        do iotask::interact(iotask) |loop_ptr| unsafe {
P
Paul Stansifer 已提交
660 661
            log(debug, fmt!("tcp::listen post-kill recv hl interact %?",
                            loop_ptr));
662 663 664 665
            (*server_data_ptr).active = false;
            uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
        };
        stream_closed_po.recv();
666
        match err_data.err_name {
B
Brian Anderson 已提交
667
          ~"EACCES" => {
668
            log(debug, ~"Got EACCES error");
669
            result::Err(access_denied)
670
          }
B
Brian Anderson 已提交
671
          ~"EADDRINUSE" => {
672
            log(debug, ~"Got EADDRINUSE error");
673
            result::Err(address_in_use)
674
          }
B
Brian Anderson 已提交
675
          _ => {
P
Paul Stansifer 已提交
676 677
            log(debug, fmt!("Got '%s' '%s' libuv error",
                            err_data.err_name, err_data.err_msg));
678
            result::Err(
679
                generic_listen_err(err_data.err_name, err_data.err_msg))
680 681 682
          }
        }
      }
B
Brian Anderson 已提交
683
      None => {
684
        on_establish_cb(kill_ch);
E
Eric Holk 已提交
685
        let kill_result = core::comm::recv(kill_po);
686
        do iotask::interact(iotask) |loop_ptr| unsafe {
P
Paul Stansifer 已提交
687 688
            log(debug, fmt!("tcp::listen post-kill recv hl interact %?",
                            loop_ptr));
689 690 691
            (*server_data_ptr).active = false;
            uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
        };
692
        stream_closed_po.recv();
693
        match kill_result {
694
          // some failure post bind/listen
695
          Some(err_data) => result::Err(generic_listen_err(err_data.err_name,
B
Brian Anderson 已提交
696
                                                           err_data.err_msg)),
697
          // clean exit
698
          None => result::Ok(())
699 700 701 702 703
        }
      }
    }
}

704 705 706 707 708 709 710 711 712 713 714 715 716 717 718
/**
 * Convert a `net::tcp::tcp_socket` to a `net::tcp::tcp_socket_buf`.
 *
 * This function takes ownership of a `net::tcp::tcp_socket`, returning it
 * stored within a buffered wrapper, which can be converted to a `io::reader`
 * or `io::writer`
 *
 * # Arguments
 *
 * * `sock` -- a `net::tcp::tcp_socket` that you want to buffer
 *
 * # Returns
 *
 * A buffered wrapper that you can cast as an `io::reader` or `io::writer`
 */
719
fn socket_buf(-sock: tcp_socket) -> tcp_socket_buf {
720
    tcp_socket_buf(@{ sock: sock, mut buf: ~[] })
721 722
}

723
/// Convenience methods extending `net::tcp::tcp_socket`
B
Brian Anderson 已提交
724
impl tcp_socket {
725 726
    fn read_start() -> result::Result<comm::Port<
        result::Result<~[u8], tcp_err_data>>, tcp_err_data> {
727 728
        read_start(self)
    }
729
    fn read_stop(-read_port:
730 731
                 comm::Port<result::Result<~[u8], tcp_err_data>>) ->
        result::Result<(), tcp_err_data> {
732
        read_stop(self, read_port)
733
    }
734
    fn read(timeout_msecs: uint) ->
735
        result::Result<~[u8], tcp_err_data> {
736 737 738
        read(self, timeout_msecs)
    }
    fn read_future(timeout_msecs: uint) ->
739
        future::Future<result::Result<~[u8], tcp_err_data>> {
740 741
        read_future(self, timeout_msecs)
    }
742
    fn write(raw_write_data: ~[u8])
743
        -> result::Result<(), tcp_err_data> {
744 745
        write(self, raw_write_data)
    }
746
    fn write_future(raw_write_data: ~[u8])
747
        -> future::Future<result::Result<(), tcp_err_data>> {
748 749
        write_future(self, raw_write_data)
    }
750
}
751

752
/// Implementation of `io::reader` trait for a buffered `net::tcp::tcp_socket`
753
impl tcp_socket_buf: io::Reader {
754 755 756 757
    fn read(buf: &[mut u8], len: uint) -> uint {
        // Loop until our buffer has enough data in it for us to read from.
        while self.data.buf.len() < len {
            let read_result = read(self.data.sock, 0u);
758
            if read_result.is_err() {
759
                let err_data = read_result.get_err();
760 761 762 763

                if err_data.err_name == ~"EOF" {
                    break;
                } else {
P
Paul Stansifer 已提交
764 765
                    debug!("ERROR sock_buf as io::reader.read err %? %?",
                           err_data.err_name, err_data.err_msg);
766

B
Brian Anderson 已提交
767
                    return 0;
768
                }
769 770
            }
            else {
771
                vec::push_all(self.data.buf, result::unwrap(read_result));
772 773
            }
        }
774

B
Brian Anderson 已提交
775
        let count = uint::min(&len, &self.data.buf.len());
776 777 778 779 780 781 782 783 784

        let mut data = ~[];
        self.data.buf <-> data;

        vec::u8::memcpy(buf, vec::view(data, 0, data.len()), count);

        vec::push_all(self.data.buf, vec::view(data, count, data.len()));

        count
785 786
    }
    fn read_byte() -> int {
787 788
        let bytes = ~[0];
        if self.read(bytes, 1u) == 0 { fail } else { bytes[0] as int }
789 790
    }
    fn unread_byte(amt: int) {
791
        vec::unshift((*(self.data)).buf, amt as u8);
792 793 794 795
    }
    fn eof() -> bool {
        false // noop
    }
796
    fn seek(dist: int, seek: io::SeekStyle) {
P
Paul Stansifer 已提交
797
        log(debug, fmt!("tcp_socket_buf seek stub %? %?", dist, seek));
798 799 800 801 802 803 804
        // noop
    }
    fn tell() -> uint {
        0u // noop
    }
}

805
/// Implementation of `io::reader` trait for a buffered `net::tcp::tcp_socket`
806
impl tcp_socket_buf: io::Writer {
807
    fn write(data: &[const u8]) unsafe {
808 809
        let socket_data_ptr =
            ptr::addr_of(*((*(self.data)).sock).socket_data);
810 811
        let w_result = write_common_impl(socket_data_ptr,
                                        vec::slice(data, 0, vec::len(data)));
812
        if w_result.is_err() {
813
            let err_data = w_result.get_err();
P
Paul Stansifer 已提交
814 815
            log(debug, fmt!("ERROR sock_buf as io::writer.writer err: %? %?",
                             err_data.err_name, err_data.err_msg));
816
        }
817
    }
818
    fn seek(dist: int, seek: io::SeekStyle) {
P
Paul Stansifer 已提交
819
      log(debug, fmt!("tcp_socket_buf seek stub %? %?", dist, seek));
820 821 822 823 824 825 826 827
        // noop
    }
    fn tell() -> uint {
        0u
    }
    fn flush() -> int {
        0
    }
828 829
    fn get_type() -> io::WriterType {
        io::File
830
    }
831 832
}

833 834
// INTERNAL API

835
fn tear_down_socket_data(socket_data: @tcp_socket_data) unsafe {
836 837
    let closed_po = core::comm::Port::<()>();
    let closed_ch = core::comm::Chan(closed_po);
838 839 840 841 842
    let close_data = {
        closed_ch: closed_ch
    };
    let close_data_ptr = ptr::addr_of(close_data);
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
843
    do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe {
P
Paul Stansifer 已提交
844 845
        log(debug, fmt!("interact dtor for tcp_socket stream %? loop %?",
            stream_handle_ptr, loop_ptr));
846 847 848 849
        uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                       close_data_ptr);
        uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb);
    };
E
Eric Holk 已提交
850
    core::comm::recv(closed_po);
P
Paul Stansifer 已提交
851
    log(debug, fmt!("about to free socket_data at %?", socket_data));
852 853
    rustrt::rust_uv_current_kernel_free(stream_handle_ptr
                                       as *libc::c_void);
854
    log(debug, ~"exiting dtor for tcp_socket");
855 856
}

857 858
// shared implementation for tcp::read
fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint)
859
    -> result::Result<~[u8],tcp_err_data> unsafe {
860
    log(debug, ~"starting tcp::read");
861
    let iotask = (*socket_data).iotask;
862
    let rs_result = read_start_common_impl(socket_data);
863
    if result::is_err(rs_result) {
864
        let err_data = result::get_err(rs_result);
865
        result::Err(err_data)
866 867
    }
    else {
868
        log(debug, ~"tcp::read before recv_timeout");
869 870
        let read_result = if timeout_msecs > 0u {
            timer::recv_timeout(
871
               iotask, timeout_msecs, result::get(rs_result))
872
        } else {
B
Brian Anderson 已提交
873
            Some(core::comm::recv(result::get(rs_result)))
874
        };
875
        log(debug, ~"tcp::read after recv_timeout");
876
        match read_result {
B
Brian Anderson 已提交
877
          None => {
878
            log(debug, ~"tcp::read: timed out..");
879
            let err_data = {
880 881
                err_name: ~"TIMEOUT",
                err_msg: ~"req timed out"
882 883
            };
            read_stop_common_impl(socket_data);
884
            result::Err(err_data)
885
          }
B
Brian Anderson 已提交
886
          Some(data_result) => {
887
            log(debug, ~"tcp::read got data");
888 889 890 891 892 893 894 895 896
            read_stop_common_impl(socket_data);
            data_result
          }
        }
    }
}

// shared impl for read_stop
fn read_stop_common_impl(socket_data: *tcp_socket_data) ->
897
    result::Result<(), tcp_err_data> unsafe {
898
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
899 900
    let stop_po = core::comm::Port::<Option<tcp_err_data>>();
    let stop_ch = core::comm::Chan(stop_po);
901
    do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe {
902
        log(debug, ~"in interact cb for tcp::read_stop");
903
        match uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
B
Brian Anderson 已提交
904
          0i32 => {
905
            log(debug, ~"successfully called uv_read_stop");
B
Brian Anderson 已提交
906
            core::comm::send(stop_ch, None);
907
          }
B
Brian Anderson 已提交
908
          _ => {
909
            log(debug, ~"failure in calling uv_read_stop");
910
            let err_data = uv::ll::get_last_err_data(loop_ptr);
B
Brian Anderson 已提交
911
            core::comm::send(stop_ch, Some(err_data.to_tcp_err()));
912 913 914
          }
        }
    };
E
Eric Holk 已提交
915
    match core::comm::recv(stop_po) {
916 917
      Some(err_data) => result::Err(err_data.to_tcp_err()),
      None => result::Ok(())
918 919 920 921 922
    }
}

// shared impl for read_start
fn read_start_common_impl(socket_data: *tcp_socket_data)
923 924
    -> result::Result<comm::Port<
        result::Result<~[u8], tcp_err_data>>, tcp_err_data> unsafe {
925
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
926 927
    let start_po = core::comm::Port::<Option<uv::ll::uv_err_data>>();
    let start_ch = core::comm::Chan(start_po);
928
    log(debug, ~"in tcp::read_start before interact loop");
929
    do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe {
P
Paul Stansifer 已提交
930
        log(debug, fmt!("in tcp::read_start interact cb %?", loop_ptr));
931
        match uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
932 933
                               on_alloc_cb,
                               on_tcp_read_cb) {
B
Brian Anderson 已提交
934
          0i32 => {
935
            log(debug, ~"success doing uv_read_start");
B
Brian Anderson 已提交
936
            core::comm::send(start_ch, None);
937
          }
B
Brian Anderson 已提交
938
          _ => {
939
            log(debug, ~"error attempting uv_read_start");
940
            let err_data = uv::ll::get_last_err_data(loop_ptr);
B
Brian Anderson 已提交
941
            core::comm::send(start_ch, Some(err_data));
942 943 944
          }
        }
    };
E
Eric Holk 已提交
945
    match core::comm::recv(start_po) {
946 947
      Some(err_data) => result::Err(err_data.to_tcp_err()),
      None => result::Ok((*socket_data).reader_po)
948 949 950
    }
}

951 952
// helper to convert a "class" vector of [u8] to a *[uv::ll::uv_buf_t]

953 954
// shared implementation used by write and write_future
fn write_common_impl(socket_data_ptr: *tcp_socket_data,
955
                     raw_write_data: ~[u8])
956
    -> result::Result<(), tcp_err_data> unsafe {
957 958 959
    let write_req_ptr = ptr::addr_of((*socket_data_ptr).write_req);
    let stream_handle_ptr =
        (*socket_data_ptr).stream_handle_ptr;
960
    let write_buf_vec =  ~[ uv::ll::buf_init(
961
        vec::unsafe::to_ptr(raw_write_data),
962
        vec::len(raw_write_data)) ];
963
    let write_buf_vec_ptr = ptr::addr_of(write_buf_vec);
964
    let result_po = core::comm::Port::<tcp_write_result>();
965
    let write_data = {
966
        result_ch: core::comm::Chan(result_po)
967 968
    };
    let write_data_ptr = ptr::addr_of(write_data);
969
    do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| unsafe {
P
Paul Stansifer 已提交
970
        log(debug, fmt!("in interact cb for tcp::write %?", loop_ptr));
971
        match uv::ll::write(write_req_ptr,
972 973 974
                          stream_handle_ptr,
                          write_buf_vec_ptr,
                          tcp_write_complete_cb) {
B
Brian Anderson 已提交
975
          0i32 => {
976
            log(debug, ~"uv_write() invoked successfully");
977 978
            uv::ll::set_data_for_req(write_req_ptr, write_data_ptr);
          }
B
Brian Anderson 已提交
979
          _ => {
980
            log(debug, ~"error invoking uv_write()");
981
            let err_data = uv::ll::get_last_err_data(loop_ptr);
E
Eric Holk 已提交
982
            core::comm::send((*write_data_ptr).result_ch,
983 984 985 986
                       tcp_write_error(err_data.to_tcp_err()));
          }
        }
    };
987 988 989 990
    // FIXME (#2656): Instead of passing unsafe pointers to local data,
    // and waiting here for the write to complete, we should transfer
    // ownership of everything to the I/O task and let it deal with the
    // aftermath, so we don't have to sit here blocking.
E
Eric Holk 已提交
991
    match core::comm::recv(result_po) {
992 993
      tcp_write_success => result::Ok(()),
      tcp_write_error(err_data) => result::Err(err_data.to_tcp_err())
994 995 996
    }
}

997 998 999 1000
enum tcp_new_connection {
    new_tcp_conn(*uv::ll::uv_tcp_t)
}

1001
type tcp_listen_fc_data = {
1002
    server_stream_ptr: *uv::ll::uv_tcp_t,
1003
    stream_closed_ch: comm::Chan<()>,
B
Brian Anderson 已提交
1004
    kill_ch: comm::Chan<Option<tcp_err_data>>,
1005
    on_connect_cb: fn~(*uv::ll::uv_tcp_t),
B
Brian Anderson 已提交
1006
    iotask: IoTask,
1007 1008 1009
    mut active: bool
};

G
Graydon Hoare 已提交
1010
extern fn tcp_lfc_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
1011
    let server_data_ptr = uv::ll::get_data_for_uv_handle(
1012
        handle) as *tcp_listen_fc_data;
E
Eric Holk 已提交
1013
    core::comm::send((*server_data_ptr).stream_closed_ch, ());
1014 1015
}

G
Graydon Hoare 已提交
1016
extern fn tcp_lfc_on_connection_cb(handle: *uv::ll::uv_tcp_t,
1017 1018
                                     status: libc::c_int) unsafe {
    let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
1019
        as *tcp_listen_fc_data;
1020
    let kill_ch = (*server_data_ptr).kill_ch;
1021
    if (*server_data_ptr).active {
1022
        match status {
B
Brian Anderson 已提交
1023 1024
          0i32 => (*server_data_ptr).on_connect_cb(handle),
          _ => {
1025
            let loop_ptr = uv::ll::get_loop_for_uv_handle(handle);
E
Eric Holk 已提交
1026
            core::comm::send(kill_ch,
B
Brian Anderson 已提交
1027
                       Some(uv::ll::get_last_err_data(loop_ptr)
1028 1029 1030 1031 1032 1033 1034
                            .to_tcp_err()));
            (*server_data_ptr).active = false;
          }
        }
    }
}

1035
fn malloc_uv_tcp_t() -> *uv::ll::uv_tcp_t unsafe {
1036
    rustrt::rust_uv_current_kernel_malloc(
1037
        rustrt::rust_uv_helper_uv_tcp_t_size()) as *uv::ll::uv_tcp_t
1038 1039
}

1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050
enum tcp_connect_result {
    tcp_connected(tcp_socket),
    tcp_connect_error(tcp_err_data)
}

enum tcp_write_result {
    tcp_write_success,
    tcp_write_error(tcp_err_data)
}

enum tcp_read_start_result {
1051
    tcp_read_start_success(comm::Port<tcp_read_result>),
1052 1053 1054 1055
    tcp_read_start_error(tcp_err_data)
}

enum tcp_read_result {
1056
    tcp_read_data(~[u8]),
1057 1058 1059 1060
    tcp_read_done,
    tcp_read_err(tcp_err_data)
}

1061
trait to_tcp_err {
1062 1063 1064
    fn to_tcp_err() -> tcp_err_data;
}

B
Brian Anderson 已提交
1065
impl uv::ll::uv_err_data: to_tcp_err {
1066 1067 1068 1069 1070
    fn to_tcp_err() -> tcp_err_data {
        { err_name: self.err_name, err_msg: self.err_msg }
    }
}

G
Graydon Hoare 已提交
1071
extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
1072 1073
                    nread: libc::ssize_t,
                    ++buf: uv::ll::uv_buf_t) unsafe {
P
Paul Stansifer 已提交
1074 1075
    log(debug, fmt!("entering on_tcp_read_cb stream: %? nread: %?",
                    stream, nread));
1076 1077 1078
    let loop_ptr = uv::ll::get_loop_for_uv_handle(stream);
    let socket_data_ptr = uv::ll::get_data_for_uv_handle(stream)
        as *tcp_socket_data;
1079
    match nread as int {
1080
      // incoming err.. probably eof
B
Brian Anderson 已提交
1081
      -1 => {
1082
        let err_data = uv::ll::get_last_err_data(loop_ptr).to_tcp_err();
P
Paul Stansifer 已提交
1083 1084
        log(debug, fmt!("on_tcp_read_cb: incoming err.. name %? msg %?",
                        err_data.err_name, err_data.err_msg));
1085
        let reader_ch = (*socket_data_ptr).reader_ch;
1086
        core::comm::send(reader_ch, result::Err(err_data));
1087 1088
      }
      // do nothing .. unneeded buf
B
Brian Anderson 已提交
1089
      0 => (),
1090
      // have data
B
Brian Anderson 已提交
1091
      _ => {
1092
        // we have data
P
Paul Stansifer 已提交
1093
        log(debug, fmt!("tcp on_read_cb nread: %d", nread as int));
1094
        let reader_ch = (*socket_data_ptr).reader_ch;
1095
        let buf_base = uv::ll::get_base_from_buf(buf);
1096
        let new_bytes = vec::unsafe::from_buf(buf_base, nread as uint);
1097
        core::comm::send(reader_ch, result::Ok(new_bytes));
1098 1099 1100
      }
    }
    uv::ll::free_base_of_buf(buf);
1101
    log(debug, ~"exiting on_tcp_read_cb");
1102 1103
}

G
Graydon Hoare 已提交
1104
extern fn on_alloc_cb(handle: *libc::c_void,
1105
                     ++suggested_size: size_t)
1106
    -> uv::ll::uv_buf_t unsafe {
1107
    log(debug, ~"tcp read on_alloc_cb!");
1108
    let char_ptr = uv::ll::malloc_buf_base_of(suggested_size);
P
Paul Stansifer 已提交
1109
    log(debug, fmt!("tcp read on_alloc_cb h: %? char_ptr: %u sugsize: %u",
1110 1111
                     handle,
                     char_ptr as uint,
P
Paul Stansifer 已提交
1112
                     suggested_size as uint));
1113
    uv::ll::buf_init(char_ptr, suggested_size as uint)
1114
}
1115 1116

type tcp_socket_close_data = {
1117
    closed_ch: comm::Chan<()>
1118 1119
};

G
Graydon Hoare 已提交
1120
extern fn tcp_socket_dtor_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
1121 1122 1123
    let data = uv::ll::get_data_for_uv_handle(handle)
        as *tcp_socket_close_data;
    let closed_ch = (*data).closed_ch;
E
Eric Holk 已提交
1124
    core::comm::send(closed_ch, ());
1125
    log(debug, ~"tcp_socket_dtor_close_cb exiting..");
1126 1127
}

G
Graydon Hoare 已提交
1128
extern fn tcp_write_complete_cb(write_req: *uv::ll::uv_write_t,
1129 1130 1131
                              status: libc::c_int) unsafe {
    let write_data_ptr = uv::ll::get_data_for_req(write_req)
        as *write_req_data;
1132
    if status == 0i32 {
1133
        log(debug, ~"successful write complete");
E
Eric Holk 已提交
1134
        core::comm::send((*write_data_ptr).result_ch, tcp_write_success);
1135
    } else {
1136 1137 1138 1139
        let stream_handle_ptr = uv::ll::get_stream_handle_from_write_req(
            write_req);
        let loop_ptr = uv::ll::get_loop_for_uv_handle(stream_handle_ptr);
        let err_data = uv::ll::get_last_err_data(loop_ptr);
1140
        log(debug, ~"failure to write");
E
Eric Holk 已提交
1141 1142
        core::comm::send((*write_data_ptr).result_ch,
                         tcp_write_error(err_data));
1143 1144 1145 1146
    }
}

type write_req_data = {
1147
    result_ch: comm::Chan<tcp_write_result>
1148 1149
};

J
Jeff Olson 已提交
1150
type connect_req_data = {
1151 1152
    result_ch: comm::Chan<conn_attempt>,
    closed_signal_ch: comm::Chan<()>
J
Jeff Olson 已提交
1153 1154
};

G
Graydon Hoare 已提交
1155
extern fn stream_error_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
J
Jeff Olson 已提交
1156 1157
    let data = uv::ll::get_data_for_uv_handle(handle) as
        *connect_req_data;
E
Eric Holk 已提交
1158
    core::comm::send((*data).closed_signal_ch, ());
P
Paul Stansifer 已提交
1159
    log(debug, fmt!("exiting steam_error_close_cb for %?", handle));
J
Jeff Olson 已提交
1160 1161
}

G
Graydon Hoare 已提交
1162
extern fn tcp_connect_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
P
Paul Stansifer 已提交
1163
    log(debug, fmt!("closed client tcp handle %?", handle));
J
Jeff Olson 已提交
1164 1165
}

G
Graydon Hoare 已提交
1166
extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t,
J
Jeff Olson 已提交
1167 1168 1169 1170
                                   status: libc::c_int) unsafe {
    let conn_data_ptr = (uv::ll::get_data_for_req(connect_req_ptr)
                      as *connect_req_data);
    let result_ch = (*conn_data_ptr).result_ch;
P
Paul Stansifer 已提交
1171
    log(debug, fmt!("tcp_connect result_ch %?", result_ch));
J
Jeff Olson 已提交
1172 1173
    let tcp_stream_ptr =
        uv::ll::get_stream_handle_from_connect_req(connect_req_ptr);
1174
    match status {
B
Brian Anderson 已提交
1175
      0i32 => {
1176
        log(debug, ~"successful tcp connection!");
E
Eric Holk 已提交
1177
        core::comm::send(result_ch, conn_success);
J
Jeff Olson 已提交
1178
      }
B
Brian Anderson 已提交
1179
      _ => {
1180
        log(debug, ~"error in tcp_connect_on_connect_cb");
J
Jeff Olson 已提交
1181 1182
        let loop_ptr = uv::ll::get_loop_for_uv_handle(tcp_stream_ptr);
        let err_data = uv::ll::get_last_err_data(loop_ptr);
P
Paul Stansifer 已提交
1183 1184
        log(debug, fmt!("err_data %? %?", err_data.err_name,
                        err_data.err_msg));
E
Eric Holk 已提交
1185
        core::comm::send(result_ch, conn_failure(err_data));
J
Jeff Olson 已提交
1186 1187 1188 1189 1190
        uv::ll::set_data_for_uv_handle(tcp_stream_ptr,
                                       conn_data_ptr);
        uv::ll::close(tcp_stream_ptr, stream_error_close_cb);
      }
    }
1191
    log(debug, ~"leaving tcp_connect_on_connect_cb");
J
Jeff Olson 已提交
1192 1193 1194 1195 1196 1197 1198 1199
}

enum conn_attempt {
    conn_success,
    conn_failure(uv::ll::uv_err_data)
}

type tcp_socket_data = {
1200 1201
    reader_po: comm::Port<result::Result<~[u8], tcp_err_data>>,
    reader_ch: comm::Chan<result::Result<~[u8], tcp_err_data>>,
1202
    stream_handle_ptr: *uv::ll::uv_tcp_t,
J
Jeff Olson 已提交
1203
    connect_req: uv::ll::uv_connect_t,
1204
    write_req: uv::ll::uv_write_t,
B
Brian Anderson 已提交
1205
    iotask: IoTask
J
Jeff Olson 已提交
1206 1207
};

1208 1209
type tcp_buffered_socket_data = {
    sock: tcp_socket,
1210
    mut buf: ~[u8]
1211
};
J
Jeff Olson 已提交
1212

1213
//#[cfg(test)]
1214
mod test {
1215
    // FIXME don't run on fbsd or linux 32 bit (#2064)
1216 1217 1218 1219 1220 1221 1222 1223 1224 1225
    #[cfg(target_os="win32")]
    #[cfg(target_os="darwin")]
    #[cfg(target_os="linux")]
    mod tcp_ipv4_server_and_client_test {
        #[cfg(target_arch="x86_64")]
        mod impl64 {
            #[test]
            fn test_gl_tcp_server_and_client_ipv4() unsafe {
                impl_gl_tcp_ipv4_server_and_client();
            }
1226
            #[test]
1227 1228 1229
            fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
                impl_gl_tcp_ipv4_client_error_connection_refused();
            }
1230 1231 1232 1233 1234 1235 1236
            #[test]
            fn test_gl_tcp_server_address_in_use() unsafe {
                impl_gl_tcp_ipv4_server_address_in_use();
            }
            #[test]
            fn test_gl_tcp_server_access_denied() unsafe {
                impl_gl_tcp_ipv4_server_access_denied();
1237
            }
1238 1239 1240 1241
            #[test]
            fn test_gl_tcp_ipv4_server_client_reader_writer() {
                impl_gl_tcp_ipv4_server_client_reader_writer();
            }
1242

1243 1244 1245 1246 1247 1248 1249 1250
        }
        #[cfg(target_arch="x86")]
        mod impl32 {
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_server_and_client_ipv4() unsafe {
                impl_gl_tcp_ipv4_server_and_client();
            }
1251 1252
            #[test]
            #[ignore(cfg(target_os = "linux"))]
1253 1254 1255
            fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
                impl_gl_tcp_ipv4_client_error_connection_refused();
            }
1256 1257 1258 1259 1260 1261 1262
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_server_address_in_use() unsafe {
                impl_gl_tcp_ipv4_server_address_in_use();
            }
            #[test]
            #[ignore(cfg(target_os = "linux"))]
1263
            #[ignore(cfg(windows), reason = "deadlocking bots")]
1264 1265 1266
            fn test_gl_tcp_server_access_denied() unsafe {
                impl_gl_tcp_ipv4_server_access_denied();
            }
1267 1268 1269 1270
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_ipv4_server_client_reader_writer() {
                impl_gl_tcp_ipv4_server_client_reader_writer();
1271
            }
1272
        }
1273
    }
1274
    fn impl_gl_tcp_ipv4_server_and_client() {
1275
        let hl_loop = uv::global_loop::get();
1276
        let server_ip = ~"127.0.0.1";
1277
        let server_port = 8888u;
1278 1279
        let expected_req = ~"ping";
        let expected_resp = ~"pong";
1280

1281 1282
        let server_result_po = core::comm::Port::<~str>();
        let server_result_ch = core::comm::Chan(server_result_po);
1283

1284 1285
        let cont_po = core::comm::Port::<()>();
        let cont_ch = core::comm::Chan(cont_po);
1286
        // server
1287
        do task::spawn_sched(task::ManualThreads(1u)) {
B
Brian Anderson 已提交
1288
            let actual_req = do comm::listen |server_ch| {
1289 1290 1291 1292
                run_tcp_test_server(
                    server_ip,
                    server_port,
                    expected_resp,
1293
                    server_ch,
1294 1295
                    cont_ch,
                    hl_loop)
1296 1297 1298
            };
            server_result_ch.send(actual_req);
        };
E
Eric Holk 已提交
1299
        core::comm::recv(cont_po);
1300
        // client
1301
        log(debug, ~"server started, firing up client..");
E
Eric Holk 已提交
1302
        let actual_resp_result = do core::comm::listen |client_ch| {
1303 1304 1305 1306
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
1307 1308
                client_ch,
                hl_loop)
1309
        };
1310
        assert actual_resp_result.is_ok();
1311
        let actual_resp = actual_resp_result.get();
E
Eric Holk 已提交
1312
        let actual_req = core::comm::recv(server_result_po);
P
Paul Stansifer 已提交
1313 1314 1315 1316
        log(debug, fmt!("REQ: expected: '%s' actual: '%s'",
                       expected_req, actual_req));
        log(debug, fmt!("RESP: expected: '%s' actual: '%s'",
                       expected_resp, actual_resp));
1317 1318
        assert str::contains(actual_req, expected_req);
        assert str::contains(actual_resp, expected_resp);
1319
    }
1320
    fn impl_gl_tcp_ipv4_client_error_connection_refused() {
1321
        let hl_loop = uv::global_loop::get();
1322
        let server_ip = ~"127.0.0.1";
1323
        let server_port = 8889u;
1324
        let expected_req = ~"ping";
1325
        // client
1326
        log(debug, ~"firing up client..");
E
Eric Holk 已提交
1327
        let actual_resp_result = do core::comm::listen |client_ch| {
1328 1329 1330 1331 1332 1333 1334
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
                client_ch,
                hl_loop)
        };
1335
        match actual_resp_result.get_err() {
B
Brian Anderson 已提交
1336 1337
          connection_refused => (),
          _ => fail ~"unknown error.. expected connection_refused"
1338 1339
        }
    }
1340 1341
    fn impl_gl_tcp_ipv4_server_address_in_use() {
        let hl_loop = uv::global_loop::get();
1342
        let server_ip = ~"127.0.0.1";
1343
        let server_port = 8890u;
1344 1345
        let expected_req = ~"ping";
        let expected_resp = ~"pong";
1346

1347 1348
        let server_result_po = core::comm::Port::<~str>();
        let server_result_ch = core::comm::Chan(server_result_po);
1349

1350 1351
        let cont_po = core::comm::Port::<()>();
        let cont_ch = core::comm::Chan(cont_po);
1352
        // server
1353
        do task::spawn_sched(task::ManualThreads(1u)) {
B
Brian Anderson 已提交
1354
            let actual_req = do comm::listen |server_ch| {
1355
                run_tcp_test_server(
1356 1357 1358 1359
                    server_ip,
                    server_port,
                    expected_resp,
                    server_ch,
1360 1361
                    cont_ch,
                    hl_loop)
1362 1363 1364
            };
            server_result_ch.send(actual_req);
        };
E
Eric Holk 已提交
1365
        core::comm::recv(cont_po);
1366 1367 1368 1369 1370 1371
        // this one should fail..
        let listen_err = run_tcp_test_server_fail(
                            server_ip,
                            server_port,
                            hl_loop);
        // client.. just doing this so that the first server tears down
1372
        log(debug, ~"server started, firing up client..");
E
Eric Holk 已提交
1373
        do core::comm::listen |client_ch| {
1374 1375 1376 1377
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
1378 1379
                client_ch,
                hl_loop)
1380
        };
1381
        match listen_err {
B
Brian Anderson 已提交
1382
          address_in_use => {
1383 1384
            assert true;
          }
B
Brian Anderson 已提交
1385
          _ => {
1386
            fail ~"expected address_in_use listen error,"+
B
Brian Anderson 已提交
1387
                ~"but got a different error varient. check logs.";
1388 1389 1390 1391 1392
          }
        }
    }
    fn impl_gl_tcp_ipv4_server_access_denied() {
        let hl_loop = uv::global_loop::get();
1393
        let server_ip = ~"127.0.0.1";
1394 1395 1396 1397 1398 1399
        let server_port = 80u;
        // this one should fail..
        let listen_err = run_tcp_test_server_fail(
                            server_ip,
                            server_port,
                            hl_loop);
1400
        match listen_err {
B
Brian Anderson 已提交
1401
          access_denied => {
1402 1403
            assert true;
          }
B
Brian Anderson 已提交
1404
          _ => {
1405 1406
            fail ~"expected address_in_use listen error,"+
                      ~"but got a different error varient. check logs.";
1407 1408 1409
          }
        }
    }
1410
    fn impl_gl_tcp_ipv4_server_client_reader_writer() {
1411 1412 1413
        /*
         XXX: Causes an ICE.

1414
        let iotask = uv::global_loop::get();
1415
        let server_ip = ~"127.0.0.1";
1416
        let server_port = 8891u;
1417 1418
        let expected_req = ~"ping";
        let expected_resp = ~"pong";
1419

E
Eric Holk 已提交
1420 1421
        let server_result_po = core::comm::port::<~str>();
        let server_result_ch = core::comm::chan(server_result_po);
1422

E
Eric Holk 已提交
1423 1424
        let cont_po = core::comm::port::<()>();
        let cont_ch = core::comm::chan(cont_po);
1425
        // server
1426
        do task::spawn_sched(task::ManualThreads(1u)) {
1427
            let actual_req = do comm::listen |server_ch| {
1428 1429 1430 1431 1432 1433 1434 1435 1436 1437
                run_tcp_test_server(
                    server_ip,
                    server_port,
                    expected_resp,
                    server_ch,
                    cont_ch,
                    iotask)
            };
            server_result_ch.send(actual_req);
        };
E
Eric Holk 已提交
1438
        core::comm::recv(cont_po);
1439 1440 1441
        // client
        let server_addr = ip::v4::parse_addr(server_ip);
        let conn_result = connect(server_addr, server_port, iotask);
1442
        if result::is_err(conn_result) {
1443 1444 1445
            assert false;
        }
        let sock_buf = @socket_buf(result::unwrap(conn_result));
1446
        buf_write(sock_buf, expected_req);
1447 1448

        // so contrived!
1449
        let actual_resp = do str::as_bytes(expected_resp) |resp_buf| {
1450
            buf_read(sock_buf, vec::len(resp_buf))
1451
        };
1452

E
Eric Holk 已提交
1453
        let actual_req = core::comm::recv(server_result_po);
P
Paul Stansifer 已提交
1454 1455 1456 1457
        log(debug, fmt!("REQ: expected: '%s' actual: '%s'",
                       expected_req, actual_req));
        log(debug, fmt!("RESP: expected: '%s' actual: '%s'",
                       expected_resp, actual_resp));
1458 1459
        assert str::contains(actual_req, expected_req);
        assert str::contains(actual_resp, expected_resp);
1460
        */
1461
    }
1462

1463
    fn buf_write<W:io::Writer>(+w: &W, val: ~str) {
P
Paul Stansifer 已提交
1464
        log(debug, fmt!("BUF_WRITE: val len %?", str::len(val)));
1465
        do str::byte_slice(val) |b_slice| {
P
Paul Stansifer 已提交
1466 1467
            log(debug, fmt!("BUF_WRITE: b_slice len %?",
                            vec::len(b_slice)));
1468
            w.write(b_slice)
1469 1470 1471
        }
    }

1472 1473
    fn buf_read<R:io::Reader>(+r: &R, len: uint) -> ~str {
        let new_bytes = (*r).read_bytes(len);
P
Paul Stansifer 已提交
1474 1475
        log(debug, fmt!("in buf_read.. new_bytes len: %?",
                        vec::len(new_bytes)));
1476 1477
        str::from_bytes(new_bytes)
    }
1478

1479
    fn run_tcp_test_server(server_ip: ~str, server_port: uint, resp: ~str,
1480 1481
                          server_ch: comm::Chan<~str>,
                          cont_ch: comm::Chan<()>,
B
Brian Anderson 已提交
1482
                          iotask: IoTask) -> ~str {
1483 1484 1485
        let server_ip_addr = ip::v4::parse_addr(server_ip);
        let listen_result = listen(server_ip_addr, server_port, 128u, iotask,
            // on_establish_cb -- called when listener is set up
1486
            |kill_ch| {
P
Paul Stansifer 已提交
1487 1488
                log(debug, fmt!("establish_cb %?",
                    kill_ch));
E
Eric Holk 已提交
1489
                core::comm::send(cont_ch, ());
1490 1491 1492
            },
            // risky to run this on the loop, but some users
            // will want the POWER
1493
            |new_conn, kill_ch| {
1494
            log(debug, ~"SERVER: new connection!");
1495
            do comm::listen |cont_ch| {
1496
                do task::spawn_sched(task::ManualThreads(1u)) {
1497
                    log(debug, ~"SERVER: starting worker for new req");
1498 1499

                    let accept_result = accept(new_conn);
1500
                    log(debug, ~"SERVER: after accept()");
1501
                    if result::is_err(accept_result) {
1502
                        log(debug, ~"SERVER: error accept connection");
1503
                        let err_data = result::get_err(accept_result);
B
Brian Anderson 已提交
1504
                        core::comm::send(kill_ch, Some(err_data));
1505
                        log(debug,
1506
                            ~"SERVER/WORKER: send on err cont ch");
1507 1508 1509 1510
                        cont_ch.send(());
                    }
                    else {
                        log(debug,
1511
                            ~"SERVER/WORKER: send on cont ch");
1512 1513
                        cont_ch.send(());
                        let sock = result::unwrap(accept_result);
1514 1515
                        log(debug, ~"SERVER: successfully accepted"+
                            ~"connection!");
1516
                        let received_req_bytes = read(sock, 0u);
1517
                        match received_req_bytes {
1518
                          result::Ok(data) => {
1519
                            log(debug, ~"SERVER: got REQ str::from_bytes..");
P
Paul Stansifer 已提交
1520 1521
                            log(debug, fmt!("SERVER: REQ data len: %?",
                                            vec::len(data)));
1522 1523
                            server_ch.send(
                                str::from_bytes(data));
1524
                            log(debug, ~"SERVER: before write");
1525
                            tcp_write_single(sock, str::to_bytes(resp));
1526
                            log(debug, ~"SERVER: after write.. die");
B
Brian Anderson 已提交
1527
                            core::comm::send(kill_ch, None);
1528
                          }
1529
                          result::Err(err_data) => {
P
Paul Stansifer 已提交
1530 1531
                            log(debug, fmt!("SERVER: error recvd: %s %s",
                                err_data.err_name, err_data.err_msg));
B
Brian Anderson 已提交
1532
                            core::comm::send(kill_ch, Some(err_data));
1533
                            server_ch.send(~"");
1534
                          }
1535
                        }
1536
                        log(debug, ~"SERVER: worker spinning down");
1537
                    }
1538
                }
1539
                log(debug, ~"SERVER: waiting to recv on cont_ch");
1540 1541
                cont_ch.recv()
            };
1542
            log(debug, ~"SERVER: recv'd on cont_ch..leaving listen cb");
1543 1544 1545
        });
        // err check on listen_result
        if result::is_err(listen_result) {
1546
            match result::get_err(listen_result) {
B
Brian Anderson 已提交
1547
              generic_listen_err(name, msg) => {
P
Paul Stansifer 已提交
1548 1549
                fail fmt!("SERVER: exited abnormally name %s msg %s",
                                name, msg);
1550
              }
B
Brian Anderson 已提交
1551
              access_denied => {
1552
                fail ~"SERVER: exited abnormally, got access denied..";
1553
              }
B
Brian Anderson 已提交
1554
              address_in_use => {
1555
                fail ~"SERVER: exited abnormally, got address in use...";
1556
              }
1557
            }
1558
        }
1559
        let ret_val = server_ch.recv();
P
Paul Stansifer 已提交
1560
        log(debug, fmt!("SERVER: exited and got return val: '%s'", ret_val));
1561 1562 1563
        ret_val
    }

1564
    fn run_tcp_test_server_fail(server_ip: ~str, server_port: uint,
B
Brian Anderson 已提交
1565
                          iotask: IoTask) -> tcp_listen_err_data {
1566 1567 1568
        let server_ip_addr = ip::v4::parse_addr(server_ip);
        let listen_result = listen(server_ip_addr, server_port, 128u, iotask,
            // on_establish_cb -- called when listener is set up
1569
            |kill_ch| {
P
Paul Stansifer 已提交
1570 1571
                log(debug, fmt!("establish_cb %?",
                    kill_ch));
1572
            },
1573
            |new_conn, kill_ch| {
P
Paul Stansifer 已提交
1574 1575
                fail fmt!("SERVER: shouldn't be called.. %? %?",
                           new_conn, kill_ch);
1576 1577
        });
        // err check on listen_result
1578
        if result::is_err(listen_result) {
1579 1580 1581
            result::get_err(listen_result)
        }
        else {
1582
            fail ~"SERVER: did not fail as expected"
1583 1584 1585
        }
    }

1586
    fn run_tcp_test_client(server_ip: ~str, server_port: uint, resp: ~str,
1587
                          client_ch: comm::Chan<~str>,
B
Brian Anderson 已提交
1588
                          iotask: IoTask) -> result::Result<~str,
1589
                                                    tcp_connect_err_data> {
1590
        let server_ip_addr = ip::v4::parse_addr(server_ip);
1591

1592
        log(debug, ~"CLIENT: starting..");
1593
        let connect_result = connect(server_ip_addr, server_port, iotask);
1594
        if result::is_err(connect_result) {
1595
            log(debug, ~"CLIENT: failed to connect");
1596
            let err_data = result::get_err(connect_result);
1597
            Err(err_data)
1598
        }
1599 1600
        else {
            let sock = result::unwrap(connect_result);
1601
            let resp_bytes = str::to_bytes(resp);
1602
            tcp_write_single(sock, resp_bytes);
1603
            let read_result = sock.read(0u);
1604
            if read_result.is_err() {
1605
                log(debug, ~"CLIENT: failure to read");
1606
                Ok(~"")
1607
            }
1608 1609 1610
            else {
                client_ch.send(str::from_bytes(read_result.get()));
                let ret_val = client_ch.recv();
P
Paul Stansifer 已提交
1611 1612
                log(debug, fmt!("CLIENT: after client_ch recv ret: '%s'",
                   ret_val));
1613
                Ok(ret_val)
1614 1615
            }
        }
1616 1617
    }

1618
    fn tcp_write_single(sock: tcp_socket, val: ~[u8]) {
1619 1620
        let write_result_future = sock.write_future(val);
        let write_result = write_result_future.get();
1621
        if result::is_err(write_result) {
1622
            log(debug, ~"tcp_write_single: write failed!");
1623
            let err_data = result::get_err(write_result);
P
Paul Stansifer 已提交
1624 1625
            log(debug, fmt!("tcp_write_single err name: %s msg: %s",
                err_data.err_name, err_data.err_msg));
1626
            // meh. torn on what to do here.
1627
            fail ~"tcp_write_single failed";
1628
        }
1629
    }
1630
}