net_tcp.rs 59.5 KB
Newer Older
1
//! High-level interface to libuv's TCP functionality
2

J
Jeff Olson 已提交
3
import ip = net_ip;
4 5
import uv::iotask;
import uv::iotask::iotask;
6
import future_spawn = future::spawn;
J
Jeff Olson 已提交
7 8
// FIXME #1935
// should be able to, but can't atm, replace w/ result::{result, extensions};
9 10
import result::*;
import libc::size_t;
11
import io::{Reader, Writer};
E
Eric Holk 已提交
12
import comm = core::comm;
J
Jeff Olson 已提交
13

14
// tcp interfaces
15
export tcp_socket;
16 17
// buffered socket
export tcp_socket_buf, socket_buf;
18 19
// errors
export tcp_err_data, tcp_connect_err_data;
20
// operations on a tcp_socket
21
export write, write_future, read_start, read_stop;
22
// tcp server stuff
23
export listen, accept;
24 25
// tcp client stuff
export connect;
J
Jeff Olson 已提交
26

27
#[nolink]
28
extern mod rustrt {
29
    fn rust_uv_current_kernel_malloc(size: libc::c_uint) -> *libc::c_void;
30
    fn rust_uv_current_kernel_free(mem: *libc::c_void);
31
    fn rust_uv_helper_uv_tcp_t_size() -> libc::c_uint;
32 33
}

34 35 36 37 38 39 40
/**
 * Encapsulates an open TCP/IP connection through libuv
 *
 * `tcp_socket` is non-copyable/sendable and automagically handles closing the
 * underlying libuv data structures when it goes out of scope. This is the
 * data structure that is used for read/write operations over a TCP stream.
 */
B
Brian Anderson 已提交
41
struct tcp_socket {
42 43 44
  let socket_data: @tcp_socket_data;
  new(socket_data: @tcp_socket_data) { self.socket_data = socket_data; }
  drop {
45
    unsafe {
46
        tear_down_socket_data(self.socket_data)
47 48
    }
  }
J
Jeff Olson 已提交
49 50
}

51 52 53 54
/**
 * A buffered wrapper for `net::tcp::tcp_socket`
 *
 * It is created with a call to `net::tcp::socket_buf()` and has impls that
55
 * satisfy both the `io::reader` and `io::writer` traits.
56
 */
B
Brian Anderson 已提交
57
struct tcp_socket_buf {
58 59
  let data: @tcp_buffered_socket_data;
  new(data: @tcp_buffered_socket_data) { self.data = data; }
60 61
}

62
/// Contains raw, string-based, error information returned from libuv
63
type tcp_err_data = {
64 65
    err_name: ~str,
    err_msg: ~str
66
};
67
/// Details returned as part of a `result::err` result from `tcp::listen`
68
enum tcp_listen_err_data {
69 70 71 72
    /**
     * Some unplanned-for error. The first and second fields correspond
     * to libuv's `err_name` and `err_msg` fields, respectively.
     */
73
    generic_listen_err(~str, ~str),
74 75 76 77 78 79 80
    /**
     * Failed to bind to the requested IP/Port, because it is already in use.
     *
     * # Possible Causes
     *
     * * Attempting to bind to a port already bound to another listener
     */
81
    address_in_use,
82 83 84 85 86 87 88 89 90 91
    /**
     * Request to bind to an IP/Port was denied by the system.
     *
     * # Possible Causes
     *
     * * Attemping to binding to an IP/Port as a non-Administrator
     *   on Windows Vista+
     * * Attempting to bind, as a non-priv'd
     *   user, to 'privileged' ports (< 1024) on *nix
     */
92 93
    access_denied
}
94
/// Details returned as part of a `result::err` result from `tcp::connect`
95
enum tcp_connect_err_data {
96 97 98 99
    /**
     * Some unplanned-for error. The first and second fields correspond
     * to libuv's `err_name` and `err_msg` fields, respectively.
     */
100
    generic_connect_err(~str, ~str),
101
    /// Invalid IP or invalid port
102 103
    connection_refused
}
104

105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
/**
 * Initiate a client connection over TCP/IP
 *
 * # Arguments
 *
 * * `input_ip` - The IP address (versions 4 or 6) of the remote host
 * * `port` - the unsigned integer of the desired remote host port
 * * `iotask` - a `uv::iotask` that the tcp request will run on
 *
 * # Returns
 *
 * A `result` that, if the operation succeeds, contains a
 * `net::net::tcp_socket` that can be used to send and receive data to/from
 * the remote host. In the event of failure, a
 * `net::tcp::tcp_connect_err_data` instance will be returned
 */
121
fn connect(-input_ip: ip::ip_addr, port: uint,
122
           iotask: iotask)
123
    -> result::result<tcp_socket, tcp_connect_err_data> unsafe {
E
Eric Holk 已提交
124 125
    let result_po = core::comm::port::<conn_attempt>();
    let closed_signal_po = core::comm::port::<()>();
J
Jeff Olson 已提交
126
    let conn_data = {
E
Eric Holk 已提交
127 128
        result_ch: core::comm::chan(result_po),
        closed_signal_ch: core::comm::chan(closed_signal_po)
J
Jeff Olson 已提交
129 130
    };
    let conn_data_ptr = ptr::addr_of(conn_data);
E
Eric Holk 已提交
131
    let reader_po = core::comm::port::<result::result<~[u8], tcp_err_data>>();
132 133 134
    let stream_handle_ptr = malloc_uv_tcp_t();
    *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
    let socket_data = @{
135
        reader_po: reader_po,
E
Eric Holk 已提交
136
        reader_ch: core::comm::chan(reader_po),
137 138 139
        stream_handle_ptr: stream_handle_ptr,
        connect_req: uv::ll::connect_t(),
        write_req: uv::ll::write_t(),
140
        iotask: iotask
J
Jeff Olson 已提交
141
    };
142
    let socket_data_ptr = ptr::addr_of(*socket_data);
143
    log(debug, fmt!{"tcp_connect result_ch %?", conn_data.result_ch});
J
Jeff Olson 已提交
144 145
    // get an unsafe representation of our stream_handle_ptr that
    // we can send into the interact cb to be handled in libuv..
146 147
    log(debug, fmt!{"stream_handle_ptr outside interact %?",
        stream_handle_ptr});
B
Brian Anderson 已提交
148
    do iotask::interact(iotask) |loop_ptr| {
149
        log(debug, ~"in interact cb for tcp client connect..");
150 151
        log(debug, fmt!{"stream_handle_ptr in interact %?",
            stream_handle_ptr});
152
        match uv::ll::tcp_init( loop_ptr, stream_handle_ptr) {
B
Brian Anderson 已提交
153
          0i32 => {
154
            log(debug, ~"tcp_init successful");
155
            match input_ip {
B
Brian Anderson 已提交
156
              ipv4 => {
157
                log(debug, ~"dealing w/ ipv4 connection..");
J
Jeff Olson 已提交
158 159
                let connect_req_ptr =
                    ptr::addr_of((*socket_data_ptr).connect_req);
160
                let addr_str = ip::format_addr(input_ip);
161
                let connect_result = match input_ip {
B
Brian Anderson 已提交
162
                  ip::ipv4(addr) => {
163 164 165 166 167
                    // have to "recreate" the sockaddr_in/6
                    // since the ip_addr discards the port
                    // info.. should probably add an additional
                    // rust type that actually is closer to
                    // what the libuv API expects (ip str + port num)
168
                    log(debug, fmt!{"addr: %?", addr});
169 170 171 172 173 174 175
                    let in_addr = uv::ll::ip4_addr(addr_str, port as int);
                    uv::ll::tcp_connect(
                        connect_req_ptr,
                        stream_handle_ptr,
                        ptr::addr_of(in_addr),
                        tcp_connect_on_connect_cb)
                  }
B
Brian Anderson 已提交
176
                  ip::ipv6(addr) => {
177
                    log(debug, fmt!{"addr: %?", addr});
178 179 180 181 182 183 184 185
                    let in_addr = uv::ll::ip6_addr(addr_str, port as int);
                    uv::ll::tcp_connect6(
                        connect_req_ptr,
                        stream_handle_ptr,
                        ptr::addr_of(in_addr),
                        tcp_connect_on_connect_cb)
                  }
                };
186
                match connect_result {
B
Brian Anderson 已提交
187
                  0i32 => {
188
                    log(debug, ~"tcp_connect successful");
J
Jeff Olson 已提交
189 190 191
                    // reusable data that we'll have for the
                    // duration..
                    uv::ll::set_data_for_uv_handle(stream_handle_ptr,
192 193
                                               socket_data_ptr as
                                                  *libc::c_void);
J
Jeff Olson 已提交
194 195 196 197
                    // just so the connect_cb can send the
                    // outcome..
                    uv::ll::set_data_for_req(connect_req_ptr,
                                             conn_data_ptr);
198
                    log(debug, ~"leaving tcp_connect interact cb...");
J
Jeff Olson 已提交
199 200 201
                    // let tcp_connect_on_connect_cb send on
                    // the result_ch, now..
                  }
B
Brian Anderson 已提交
202
                  _ => {
J
Jeff Olson 已提交
203 204 205
                    // immediate connect failure.. probably a garbage
                    // ip or somesuch
                    let err_data = uv::ll::get_last_err_data(loop_ptr);
E
Eric Holk 已提交
206
                    core::comm::send((*conn_data_ptr).result_ch,
207
                               conn_failure(err_data.to_tcp_err()));
J
Jeff Olson 已提交
208 209 210 211 212 213 214 215
                    uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                                   conn_data_ptr);
                    uv::ll::close(stream_handle_ptr, stream_error_close_cb);
                  }
                }
              }
            }
        }
B
Brian Anderson 已提交
216
          _ => {
J
Jeff Olson 已提交
217 218
            // failure to create a tcp handle
            let err_data = uv::ll::get_last_err_data(loop_ptr);
E
Eric Holk 已提交
219
            core::comm::send((*conn_data_ptr).result_ch,
220
                       conn_failure(err_data.to_tcp_err()));
J
Jeff Olson 已提交
221 222 223
          }
        }
    };
E
Eric Holk 已提交
224
    match core::comm::recv(result_po) {
B
Brian Anderson 已提交
225
      conn_success => {
226
        log(debug, ~"tcp::connect - received success on result_po");
227
        result::ok(tcp_socket(socket_data))
J
Jeff Olson 已提交
228
      }
B
Brian Anderson 已提交
229
      conn_failure(err_data) => {
E
Eric Holk 已提交
230
        core::comm::recv(closed_signal_po);
231
        log(debug, ~"tcp::connect - received failure on result_po");
232 233 234
        // still have to free the malloc'd stream handle..
        rustrt::rust_uv_current_kernel_free(stream_handle_ptr
                                           as *libc::c_void);
235
        let tcp_conn_err = match err_data.err_name {
B
Brian Anderson 已提交
236 237
          ~"ECONNREFUSED" => connection_refused,
          _ => generic_connect_err(err_data.err_name, err_data.err_msg)
238 239
        };
        result::err(tcp_conn_err)
J
Jeff Olson 已提交
240 241 242
      }
    }
}
243

244 245 246 247 248 249
/**
 * Write binary data to a tcp stream; Blocks until operation completes
 *
 * # Arguments
 *
 * * sock - a `tcp_socket` to write to
250
 * * raw_write_data - a vector of `~[u8]` that will be written to the stream.
251 252 253 254 255 256 257
 * This value must remain valid for the duration of the `write` call
 *
 * # Returns
 *
 * A `result` object with a `nil` value as the `ok` variant, or a
 * `tcp_err_data` value as the `err` variant
 */
258
fn write(sock: tcp_socket, raw_write_data: ~[u8])
259
    -> result::result<(), tcp_err_data> unsafe {
260
    let socket_data_ptr = ptr::addr_of(*(sock.socket_data));
261 262 263
    write_common_impl(socket_data_ptr, raw_write_data)
}

264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
/**
 * Write binary data to tcp stream; Returns a `future::future` value
 * immediately
 *
 * # Safety
 *
 * This function can produce unsafe results if:
 *
 * 1. the call to `write_future` is made
 * 2. the `future::future` value returned is never resolved via
 * `future::get`
 * 3. and then the `tcp_socket` passed in to `write_future` leaves
 * scope and is destructed before the task that runs the libuv write
 * operation completes.
 *
 * As such: If using `write_future`, always be sure to resolve the returned
 * `future` so as to ensure libuv doesn't try to access a released write
 * handle. Otherwise, use the blocking `tcp::write` function instead.
 *
 * # Arguments
 *
 * * sock - a `tcp_socket` to write to
286
 * * raw_write_data - a vector of `~[u8]` that will be written to the stream.
287 288 289 290 291 292 293 294
 * This value must remain valid for the duration of the `write` call
 *
 * # Returns
 *
 * A `future` value that, once the `write` operation completes, resolves to a
 * `result` object with a `nil` value as the `ok` variant, or a `tcp_err_data`
 * value as the `err` variant
 */
295
fn write_future(sock: tcp_socket, raw_write_data: ~[u8])
296
    -> future::Future<result::result<(), tcp_err_data>> unsafe {
297
    let socket_data_ptr = ptr::addr_of(*(sock.socket_data));
298
    do future_spawn {
299
        let data_copy = copy(raw_write_data);
300
        write_common_impl(socket_data_ptr, data_copy)
301
    }
302 303
}

304 305 306 307 308 309 310 311 312 313 314
/**
 * Begin reading binary data from an open TCP connection; used with
 * `read_stop`
 *
 * # Arguments
 *
 * * sock -- a `net::tcp::tcp_socket` for the connection to read from
 *
 * # Returns
 *
 * * A `result` instance that will either contain a
E
Eric Holk 已提交
315 316 317
 * `core::comm::port<tcp_read_result>` that the user can read (and
 * optionally, loop on) from until `read_stop` is called, or a
 * `tcp_err_data` record
318
 */
319
fn read_start(sock: tcp_socket)
320
    -> result::result<comm::Port<
321
        result::result<~[u8], tcp_err_data>>, tcp_err_data> unsafe {
322
    let socket_data = ptr::addr_of(*(sock.socket_data));
323
    read_start_common_impl(socket_data)
324
}
325

326 327 328 329 330 331 332
/**
 * Stop reading from an open TCP connection; used with `read_start`
 *
 * # Arguments
 *
 * * `sock` - a `net::tcp::tcp_socket` that you wish to stop reading on
 */
333
fn read_stop(sock: tcp_socket,
334
             -read_port: comm::Port<result::result<~[u8], tcp_err_data>>) ->
335
    result::result<(), tcp_err_data> unsafe {
336
    log(debug, fmt!{"taking the read_port out of commission %?", read_port});
337
    let socket_data = ptr::addr_of(*sock.socket_data);
338 339 340
    read_stop_common_impl(socket_data)
}

341 342 343 344 345 346 347 348 349 350 351 352 353 354 355
/**
 * Reads a single chunk of data from `tcp_socket`; block until data/error
 * recv'd
 *
 * Does a blocking read operation for a single chunk of data from a
 * `tcp_socket` until a data arrives or an error is received. The provided
 * `timeout_msecs` value is used to raise an error if the timeout period
 * passes without any data received.
 *
 * # Arguments
 *
 * * `sock` - a `net::tcp::tcp_socket` that you wish to read from
 * * `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
 * read attempt. Pass `0u` to wait indefinitely
 */
356
fn read(sock: tcp_socket, timeout_msecs: uint)
357
    -> result::result<~[u8],tcp_err_data> {
358
    let socket_data = ptr::addr_of(*(sock.socket_data));
359 360 361
    read_common_impl(socket_data, timeout_msecs)
}

362
/**
363
 * Reads a single chunk of data; returns a `future::future<~[u8]>`
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390
 * immediately
 *
 * Does a non-blocking read operation for a single chunk of data from a
 * `tcp_socket` and immediately returns a `future` value representing the
 * result. When resolving the returned `future`, it will block until data
 * arrives or an error is received. The provided `timeout_msecs`
 * value is used to raise an error if the timeout period passes without any
 * data received.
 *
 * # Safety
 *
 * This function can produce unsafe results if the call to `read_future` is
 * made, the `future::future` value returned is never resolved via
 * `future::get`, and then the `tcp_socket` passed in to `read_future` leaves
 * scope and is destructed before the task that runs the libuv read
 * operation completes.
 *
 * As such: If using `read_future`, always be sure to resolve the returned
 * `future` so as to ensure libuv doesn't try to access a released read
 * handle. Otherwise, use the blocking `tcp::read` function instead.
 *
 * # Arguments
 *
 * * `sock` - a `net::tcp::tcp_socket` that you wish to read from
 * * `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
 * read attempt. Pass `0u` to wait indefinitely
 */
391
fn read_future(sock: tcp_socket, timeout_msecs: uint)
392
    -> future::Future<result::result<~[u8],tcp_err_data>> {
393
    let socket_data = ptr::addr_of(*(sock.socket_data));
394
    do future_spawn {
395
        read_common_impl(socket_data, timeout_msecs)
396
    }
397
}
398

399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432
/**
 * Bind an incoming client connection to a `net::tcp::tcp_socket`
 *
 * # Notes
 *
 * It is safe to call `net::tcp::accept` _only_ within the context of the
 * `new_connect_cb` callback provided as the final argument to the
 * `net::tcp::listen` function.
 *
 * The `new_conn` opaque value is provided _only_ as the first argument to the
 * `new_connect_cb` provided as a part of `net::tcp::listen`.
 * It can be safely sent to another task but it _must_ be
 * used (via `net::tcp::accept`) before the `new_connect_cb` call it was
 * provided to returns.
 *
 * This implies that a port/chan pair must be used to make sure that the
 * `new_connect_cb` call blocks until an attempt to create a
 * `net::tcp::tcp_socket` is completed.
 *
 * # Example
 *
 * Here, the `new_conn` is used in conjunction with `accept` from within
 * a task spawned by the `new_connect_cb` passed into `listen`
 *
 * ~~~~~~~~~~~
 * net::tcp::listen(remote_ip, remote_port, backlog)
 *     // this callback is ran once after the connection is successfully
 *     // set up
 *     {|kill_ch|
 *       // pass the kill_ch to your main loop or wherever you want
 *       // to be able to externally kill the server from
 *     }
 *     // this callback is ran when a new connection arrives
 *     {|new_conn, kill_ch|
E
Eric Holk 已提交
433 434
 *     let cont_po = core::comm::port::<option<tcp_err_data>>();
 *     let cont_ch = core::comm::chan(cont_po);
435 436 437
 *     task::spawn {||
 *         let accept_result = net::tcp::accept(new_conn);
 *         if accept_result.is_err() {
E
Eric Holk 已提交
438
 *             core::comm::send(cont_ch, result::get_err(accept_result));
439 440 441 442
 *             // fail?
 *         }
 *         else {
 *             let sock = result::get(accept_result);
E
Eric Holk 已提交
443
 *             core::comm::send(cont_ch, true);
444 445 446
 *             // do work here
 *         }
 *     };
E
Eric Holk 已提交
447
 *     match core::comm::recv(cont_po) {
448
 *       // shut down listen()
E
Eric Holk 已提交
449
 *       some(err_data) { core::comm::send(kill_chan, some(err_data)) }
450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467
 *       // wait for next connection
 *       none {}
 *     }
 * };
 * ~~~~~~~~~~~
 *
 * # Arguments
 *
 * * `new_conn` - an opaque value used to create a new `tcp_socket`
 *
 * # Returns
 *
 * On success, this function will return a `net::tcp::tcp_socket` as the
 * `ok` variant of a `result`. The `net::tcp::tcp_socket` is anchored within
 * the task that `accept` was called within for its lifetime. On failure,
 * this function will return a `net::tcp::tcp_err_data` record
 * as the `err` variant of a `result`.
 */
468 469 470
fn accept(new_conn: tcp_new_connection)
    -> result::result<tcp_socket, tcp_err_data> unsafe {

471
    match new_conn{
B
Brian Anderson 已提交
472
      new_tcp_conn(server_handle_ptr) => {
473
        let server_data_ptr = uv::ll::get_data_for_uv_handle(
474
            server_handle_ptr) as *tcp_listen_fc_data;
E
Eric Holk 已提交
475
        let reader_po = core::comm::port();
476
        let iotask = (*server_data_ptr).iotask;
477 478 479
        let stream_handle_ptr = malloc_uv_tcp_t();
        *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
        let client_socket_data = @{
480
            reader_po: reader_po,
E
Eric Holk 已提交
481
            reader_ch: core::comm::chan(reader_po),
482
            stream_handle_ptr : stream_handle_ptr,
483 484
            connect_req : uv::ll::connect_t(),
            write_req : uv::ll::write_t(),
485
            iotask : iotask
486
        };
487 488 489
        let client_socket_data_ptr = ptr::addr_of(*client_socket_data);
        let client_stream_handle_ptr =
            (*client_socket_data_ptr).stream_handle_ptr;
490

E
Eric Holk 已提交
491 492
        let result_po = core::comm::port::<option<tcp_err_data>>();
        let result_ch = core::comm::chan(result_po);
493 494 495 496 497 498 499

        // UNSAFE LIBUV INTERACTION BEGIN
        // .. normally this happens within the context of
        // a call to uv::hl::interact.. but we're breaking
        // the rules here because this always has to be
        // called within the context of a listen() new_connect_cb
        // callback (or it will likely fail and drown your cat)
500
        log(debug, ~"in interact cb for tcp::accept");
501 502
        let loop_ptr = uv::ll::get_loop_for_uv_handle(
            server_handle_ptr);
503
        match uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
B
Brian Anderson 已提交
504
          0i32 => {
505
            log(debug, ~"uv_tcp_init successful for client stream");
506
            match uv::ll::accept(
507 508
                server_handle_ptr as *libc::c_void,
                client_stream_handle_ptr as *libc::c_void) {
B
Brian Anderson 已提交
509
              0i32 => {
510
                log(debug, ~"successfully accepted client connection");
511
                uv::ll::set_data_for_uv_handle(client_stream_handle_ptr,
512 513
                                               client_socket_data_ptr
                                                   as *libc::c_void);
E
Eric Holk 已提交
514
                core::comm::send(result_ch, none);
515
              }
B
Brian Anderson 已提交
516
              _ => {
517
                log(debug, ~"failed to accept client conn");
E
Eric Holk 已提交
518
                core::comm::send(result_ch, some(
519 520 521
                    uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
              }
            }
522
          }
B
Brian Anderson 已提交
523
          _ => {
524
            log(debug, ~"failed to init client stream");
E
Eric Holk 已提交
525
            core::comm::send(result_ch, some(
526 527 528 529
                uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
          }
        }
        // UNSAFE LIBUV INTERACTION END
E
Eric Holk 已提交
530
        match core::comm::recv(result_po) {
B
Brian Anderson 已提交
531 532
          some(err_data) => result::err(err_data),
          none => result::ok(tcp_socket(client_socket_data))
533 534 535 536 537
        }
      }
    }
}

538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555
/**
 * Bind to a given IP/port and listen for new connections
 *
 * # Arguments
 *
 * * `host_ip` - a `net::ip::ip_addr` representing a unique IP
 * (versions 4 or 6)
 * * `port` - a uint representing the port to listen on
 * * `backlog` - a uint representing the number of incoming connections
 * to cache in memory
 * * `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on
 * * `on_establish_cb` - a callback that is evaluated if/when the listener
 * is successfully established. it takes no parameters
 * * `new_connect_cb` - a callback to be evaluated, on the libuv thread,
 * whenever a client attempts to conect on the provided ip/port. the
 * callback's arguments are:
 *     * `new_conn` - an opaque type that can be passed to
 *     `net::tcp::accept` in order to be converted to a `tcp_socket`.
E
Eric Holk 已提交
556 557
 *     * `kill_ch` - channel of type `core::comm::chan<option<tcp_err_data>>`.
 *     this channel can be used to send a message to cause `listen` to begin
558 559 560 561 562 563 564 565
 *     closing the underlying libuv data structures.
 *
 * # returns
 *
 * a `result` instance containing empty data of type `()` on a
 * successful/normal shutdown, and a `tcp_listen_err_data` enum in the event
 * of listen exiting because of an error
 */
566
fn listen(-host_ip: ip::ip_addr, port: uint, backlog: uint,
567
          iotask: iotask,
568
          on_establish_cb: fn~(comm::Chan<option<tcp_err_data>>),
569
          +new_connect_cb: fn~(tcp_new_connection,
570
                               comm::Chan<option<tcp_err_data>>))
571
    -> result::result<(), tcp_listen_err_data> unsafe {
572
    do listen_common(host_ip, port, backlog, iotask, on_establish_cb)
573
        // on_connect_cb
574
        |handle| {
575 576 577 578 579 580 581 582
            let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
                as *tcp_listen_fc_data;
            let new_conn = new_tcp_conn(handle);
            let kill_ch = (*server_data_ptr).kill_ch;
            new_connect_cb(new_conn, kill_ch);
    }
}

583
fn listen_common(-host_ip: ip::ip_addr, port: uint, backlog: uint,
584
          iotask: iotask,
585
          on_establish_cb: fn~(comm::Chan<option<tcp_err_data>>),
586
          -on_connect_cb: fn~(*uv::ll::uv_tcp_t))
587
    -> result::result<(), tcp_listen_err_data> unsafe {
E
Eric Holk 已提交
588 589 590
    let stream_closed_po = core::comm::port::<()>();
    let kill_po = core::comm::port::<option<tcp_err_data>>();
    let kill_ch = core::comm::chan(kill_po);
591 592 593 594
    let server_stream = uv::ll::tcp_t();
    let server_stream_ptr = ptr::addr_of(server_stream);
    let server_data = {
        server_stream_ptr: server_stream_ptr,
E
Eric Holk 已提交
595
        stream_closed_ch: core::comm::chan(stream_closed_po),
596
        kill_ch: kill_ch,
597
        on_connect_cb: on_connect_cb,
598
        iotask: iotask,
599 600 601 602
        mut active: true
    };
    let server_data_ptr = ptr::addr_of(server_data);

E
Eric Holk 已提交
603
    let setup_result = do core::comm::listen |setup_ch| {
J
Jeff Olson 已提交
604
        // this is to address a compiler warning about
605 606 607 608
        // an implicit copy.. it seems that double nested
        // will defeat a move sigil, as is done to the host_ip
        // arg above.. this same pattern works w/o complaint in
        // tcp::connect (because the iotask::interact cb isn't
E
Eric Holk 已提交
609
        // nested within a core::comm::listen block)
610
        let loc_ip = copy(host_ip);
611
        do iotask::interact(iotask) |loop_ptr| {
612
            match uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
B
Brian Anderson 已提交
613
              0i32 => {
614 615 616
                uv::ll::set_data_for_uv_handle(
                    server_stream_ptr,
                    server_data_ptr);
617
                let addr_str = ip::format_addr(loc_ip);
618
                let bind_result = match loc_ip {
B
Brian Anderson 已提交
619
                  ip::ipv4(addr) => {
620
                    log(debug, fmt!{"addr: %?", addr});
621 622 623 624
                    let in_addr = uv::ll::ip4_addr(addr_str, port as int);
                    uv::ll::tcp_bind(server_stream_ptr,
                                     ptr::addr_of(in_addr))
                  }
B
Brian Anderson 已提交
625
                  ip::ipv6(addr) => {
626
                    log(debug, fmt!{"addr: %?", addr});
627 628 629 630 631
                    let in_addr = uv::ll::ip6_addr(addr_str, port as int);
                    uv::ll::tcp_bind6(server_stream_ptr,
                                     ptr::addr_of(in_addr))
                  }
                };
632
                match bind_result {
B
Brian Anderson 已提交
633
                  0i32 => {
634
                    match uv::ll::listen(server_stream_ptr,
635 636
                                       backlog as libc::c_int,
                                       tcp_lfc_on_connection_cb) {
E
Eric Holk 已提交
637
                      0i32 => core::comm::send(setup_ch, none),
B
Brian Anderson 已提交
638
                      _ => {
639
                        log(debug, ~"failure to uv_listen()");
640
                        let err_data = uv::ll::get_last_err_data(loop_ptr);
E
Eric Holk 已提交
641
                        core::comm::send(setup_ch, some(err_data));
642 643
                      }
                    }
644
                  }
B
Brian Anderson 已提交
645
                  _ => {
646
                    log(debug, ~"failure to uv_tcp_bind");
647
                    let err_data = uv::ll::get_last_err_data(loop_ptr);
E
Eric Holk 已提交
648
                    core::comm::send(setup_ch, some(err_data));
649 650 651
                  }
                }
              }
B
Brian Anderson 已提交
652
              _ => {
653
                log(debug, ~"failure to uv_tcp_init");
654
                let err_data = uv::ll::get_last_err_data(loop_ptr);
E
Eric Holk 已提交
655
                core::comm::send(setup_ch, some(err_data));
656 657
              }
            }
658 659
        };
        setup_ch.recv()
660
    };
661
    match setup_result {
B
Brian Anderson 已提交
662
      some(err_data) => {
663
        do iotask::interact(iotask) |loop_ptr| {
664 665
            log(debug, fmt!{"tcp::listen post-kill recv hl interact %?",
                            loop_ptr});
666 667 668 669
            (*server_data_ptr).active = false;
            uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
        };
        stream_closed_po.recv();
670
        match err_data.err_name {
B
Brian Anderson 已提交
671
          ~"EACCES" => {
672
            log(debug, ~"Got EACCES error");
673 674
            result::err(access_denied)
          }
B
Brian Anderson 已提交
675
          ~"EADDRINUSE" => {
676
            log(debug, ~"Got EADDRINUSE error");
677
            result::err(address_in_use)
678
          }
B
Brian Anderson 已提交
679
          _ => {
680 681
            log(debug, fmt!{"Got '%s' '%s' libuv error",
                            err_data.err_name, err_data.err_msg});
682 683
            result::err(
                generic_listen_err(err_data.err_name, err_data.err_msg))
684 685 686
          }
        }
      }
B
Brian Anderson 已提交
687
      none => {
688
        on_establish_cb(kill_ch);
E
Eric Holk 已提交
689
        let kill_result = core::comm::recv(kill_po);
B
Brian Anderson 已提交
690
        do iotask::interact(iotask) |loop_ptr| {
691 692
            log(debug, fmt!{"tcp::listen post-kill recv hl interact %?",
                            loop_ptr});
693 694 695
            (*server_data_ptr).active = false;
            uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
        };
696
        stream_closed_po.recv();
697
        match kill_result {
698
          // some failure post bind/listen
B
Brian Anderson 已提交
699 700
          some(err_data) => result::err(generic_listen_err(err_data.err_name,
                                                           err_data.err_msg)),
701
          // clean exit
B
Brian Anderson 已提交
702
          none => result::ok(())
703 704 705 706 707
        }
      }
    }
}

708 709 710 711 712 713 714 715 716 717 718 719 720 721 722
/**
 * Convert a `net::tcp::tcp_socket` to a `net::tcp::tcp_socket_buf`.
 *
 * This function takes ownership of a `net::tcp::tcp_socket`, returning it
 * stored within a buffered wrapper, which can be converted to a `io::reader`
 * or `io::writer`
 *
 * # Arguments
 *
 * * `sock` -- a `net::tcp::tcp_socket` that you want to buffer
 *
 * # Returns
 *
 * A buffered wrapper that you can cast as an `io::reader` or `io::writer`
 */
723
fn socket_buf(-sock: tcp_socket) -> tcp_socket_buf {
724
    tcp_socket_buf(@{ sock: sock, mut buf: ~[] })
725 726
}

727
/// Convenience methods extending `net::tcp::tcp_socket`
B
Brian Anderson 已提交
728
impl tcp_socket {
729
    fn read_start() -> result::result<comm::Port<
730
        result::result<~[u8], tcp_err_data>>, tcp_err_data> {
731 732
        read_start(self)
    }
733
    fn read_stop(-read_port:
734
                 comm::Port<result::result<~[u8], tcp_err_data>>) ->
735
        result::result<(), tcp_err_data> {
736
        read_stop(self, read_port)
737
    }
738
    fn read(timeout_msecs: uint) ->
739
        result::result<~[u8], tcp_err_data> {
740 741 742
        read(self, timeout_msecs)
    }
    fn read_future(timeout_msecs: uint) ->
743
        future::Future<result::result<~[u8], tcp_err_data>> {
744 745
        read_future(self, timeout_msecs)
    }
746
    fn write(raw_write_data: ~[u8])
747 748 749
        -> result::result<(), tcp_err_data> {
        write(self, raw_write_data)
    }
750
    fn write_future(raw_write_data: ~[u8])
751
        -> future::Future<result::result<(), tcp_err_data>> {
752 753
        write_future(self, raw_write_data)
    }
754
}
755

756
/// Implementation of `io::reader` trait for a buffered `net::tcp::tcp_socket`
757
impl @tcp_socket_buf: io::Reader {
758 759 760 761
    fn read(buf: &[mut u8], len: uint) -> uint {
        // Loop until our buffer has enough data in it for us to read from.
        while self.data.buf.len() < len {
            let read_result = read(self.data.sock, 0u);
762
            if read_result.is_err() {
763
                let err_data = read_result.get_err();
764 765 766 767

                if err_data.err_name == ~"EOF" {
                    break;
                } else {
768 769
                    debug!{"ERROR sock_buf as io::reader.read err %? %?",
                           err_data.err_name, err_data.err_msg};
770

B
Brian Anderson 已提交
771
                    return 0;
772
                }
773 774
            }
            else {
775
                vec::push_all(self.data.buf, result::unwrap(read_result));
776 777
            }
        }
778 779 780 781 782 783 784 785 786 787 788

        let count = uint::min(len, self.data.buf.len());

        let mut data = ~[];
        self.data.buf <-> data;

        vec::u8::memcpy(buf, vec::view(data, 0, data.len()), count);

        vec::push_all(self.data.buf, vec::view(data, count, data.len()));

        count
789 790
    }
    fn read_byte() -> int {
791 792
        let bytes = ~[0];
        if self.read(bytes, 1u) == 0 { fail } else { bytes[0] as int }
793 794
    }
    fn unread_byte(amt: int) {
795
        vec::unshift((*(self.data)).buf, amt as u8);
796 797 798 799
    }
    fn eof() -> bool {
        false // noop
    }
800
    fn seek(dist: int, seek: io::SeekStyle) {
801
        log(debug, fmt!{"tcp_socket_buf seek stub %? %?", dist, seek});
802 803 804 805 806 807 808
        // noop
    }
    fn tell() -> uint {
        0u // noop
    }
}

809
/// Implementation of `io::reader` trait for a buffered `net::tcp::tcp_socket`
810
impl @tcp_socket_buf: io::Writer {
811
    fn write(data: &[const u8]) unsafe {
812 813
        let socket_data_ptr =
            ptr::addr_of(*((*(self.data)).sock).socket_data);
814 815
        let w_result = write_common_impl(socket_data_ptr,
                                        vec::slice(data, 0, vec::len(data)));
816
        if w_result.is_err() {
817
            let err_data = w_result.get_err();
818 819
            log(debug, fmt!{"ERROR sock_buf as io::writer.writer err: %? %?",
                             err_data.err_name, err_data.err_msg});
820
        }
821
    }
822
    fn seek(dist: int, seek: io::SeekStyle) {
823
      log(debug, fmt!{"tcp_socket_buf seek stub %? %?", dist, seek});
824 825 826 827 828 829 830 831
        // noop
    }
    fn tell() -> uint {
        0u
    }
    fn flush() -> int {
        0
    }
832 833
    fn get_type() -> io::WriterType {
        io::File
834
    }
835 836
}

837 838
// INTERNAL API

839
fn tear_down_socket_data(socket_data: @tcp_socket_data) unsafe {
E
Eric Holk 已提交
840 841
    let closed_po = core::comm::port::<()>();
    let closed_ch = core::comm::chan(closed_po);
842 843 844 845 846
    let close_data = {
        closed_ch: closed_ch
    };
    let close_data_ptr = ptr::addr_of(close_data);
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
847
    do iotask::interact((*socket_data).iotask) |loop_ptr| {
848 849
        log(debug, fmt!{"interact dtor for tcp_socket stream %? loop %?",
            stream_handle_ptr, loop_ptr});
850 851 852 853
        uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                       close_data_ptr);
        uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb);
    };
E
Eric Holk 已提交
854
    core::comm::recv(closed_po);
855
    log(debug, fmt!{"about to free socket_data at %?", socket_data});
856 857
    rustrt::rust_uv_current_kernel_free(stream_handle_ptr
                                       as *libc::c_void);
858
    log(debug, ~"exiting dtor for tcp_socket");
859 860
}

861 862
// shared implementation for tcp::read
fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint)
863
    -> result::result<~[u8],tcp_err_data> unsafe {
864
    log(debug, ~"starting tcp::read");
865
    let iotask = (*socket_data).iotask;
866
    let rs_result = read_start_common_impl(socket_data);
867
    if result::is_err(rs_result) {
868 869 870 871
        let err_data = result::get_err(rs_result);
        result::err(err_data)
    }
    else {
872
        log(debug, ~"tcp::read before recv_timeout");
873 874
        let read_result = if timeout_msecs > 0u {
            timer::recv_timeout(
875
               iotask, timeout_msecs, result::get(rs_result))
876
        } else {
E
Eric Holk 已提交
877
            some(core::comm::recv(result::get(rs_result)))
878
        };
879
        log(debug, ~"tcp::read after recv_timeout");
880
        match read_result {
B
Brian Anderson 已提交
881
          none => {
882
            log(debug, ~"tcp::read: timed out..");
883
            let err_data = {
884 885
                err_name: ~"TIMEOUT",
                err_msg: ~"req timed out"
886 887 888 889
            };
            read_stop_common_impl(socket_data);
            result::err(err_data)
          }
B
Brian Anderson 已提交
890
          some(data_result) => {
891
            log(debug, ~"tcp::read got data");
892 893 894 895 896 897 898 899 900 901 902
            read_stop_common_impl(socket_data);
            data_result
          }
        }
    }
}

// shared impl for read_stop
fn read_stop_common_impl(socket_data: *tcp_socket_data) ->
    result::result<(), tcp_err_data> unsafe {
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
E
Eric Holk 已提交
903 904
    let stop_po = core::comm::port::<option<tcp_err_data>>();
    let stop_ch = core::comm::chan(stop_po);
B
Brian Anderson 已提交
905
    do iotask::interact((*socket_data).iotask) |loop_ptr| {
906
        log(debug, ~"in interact cb for tcp::read_stop");
907
        match uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
B
Brian Anderson 已提交
908
          0i32 => {
909
            log(debug, ~"successfully called uv_read_stop");
E
Eric Holk 已提交
910
            core::comm::send(stop_ch, none);
911
          }
B
Brian Anderson 已提交
912
          _ => {
913
            log(debug, ~"failure in calling uv_read_stop");
914
            let err_data = uv::ll::get_last_err_data(loop_ptr);
E
Eric Holk 已提交
915
            core::comm::send(stop_ch, some(err_data.to_tcp_err()));
916 917 918
          }
        }
    };
E
Eric Holk 已提交
919
    match core::comm::recv(stop_po) {
B
Brian Anderson 已提交
920 921
      some(err_data) => result::err(err_data.to_tcp_err()),
      none => result::ok(())
922 923 924 925 926
    }
}

// shared impl for read_start
fn read_start_common_impl(socket_data: *tcp_socket_data)
927
    -> result::result<comm::Port<
928
        result::result<~[u8], tcp_err_data>>, tcp_err_data> unsafe {
929
    let stream_handle_ptr = (*socket_data).stream_handle_ptr;
E
Eric Holk 已提交
930 931
    let start_po = core::comm::port::<option<uv::ll::uv_err_data>>();
    let start_ch = core::comm::chan(start_po);
932
    log(debug, ~"in tcp::read_start before interact loop");
B
Brian Anderson 已提交
933
    do iotask::interact((*socket_data).iotask) |loop_ptr| {
934
        log(debug, fmt!{"in tcp::read_start interact cb %?", loop_ptr});
935
        match uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
936 937
                               on_alloc_cb,
                               on_tcp_read_cb) {
B
Brian Anderson 已提交
938
          0i32 => {
939
            log(debug, ~"success doing uv_read_start");
E
Eric Holk 已提交
940
            core::comm::send(start_ch, none);
941
          }
B
Brian Anderson 已提交
942
          _ => {
943
            log(debug, ~"error attempting uv_read_start");
944
            let err_data = uv::ll::get_last_err_data(loop_ptr);
E
Eric Holk 已提交
945
            core::comm::send(start_ch, some(err_data));
946 947 948
          }
        }
    };
E
Eric Holk 已提交
949
    match core::comm::recv(start_po) {
B
Brian Anderson 已提交
950 951
      some(err_data) => result::err(err_data.to_tcp_err()),
      none => result::ok((*socket_data).reader_po)
952 953 954
    }
}

955 956
// helper to convert a "class" vector of [u8] to a *[uv::ll::uv_buf_t]

957 958
// shared implementation used by write and write_future
fn write_common_impl(socket_data_ptr: *tcp_socket_data,
959
                     raw_write_data: ~[u8])
960 961 962 963
    -> result::result<(), tcp_err_data> unsafe {
    let write_req_ptr = ptr::addr_of((*socket_data_ptr).write_req);
    let stream_handle_ptr =
        (*socket_data_ptr).stream_handle_ptr;
964
    let write_buf_vec =  ~[ uv::ll::buf_init(
965
        vec::unsafe::to_ptr(raw_write_data),
966
        vec::len(raw_write_data)) ];
967
    let write_buf_vec_ptr = ptr::addr_of(write_buf_vec);
E
Eric Holk 已提交
968
    let result_po = core::comm::port::<tcp_write_result>();
969
    let write_data = {
E
Eric Holk 已提交
970
        result_ch: core::comm::chan(result_po)
971 972
    };
    let write_data_ptr = ptr::addr_of(write_data);
B
Brian Anderson 已提交
973
    do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| {
974
        log(debug, fmt!{"in interact cb for tcp::write %?", loop_ptr});
975
        match uv::ll::write(write_req_ptr,
976 977 978
                          stream_handle_ptr,
                          write_buf_vec_ptr,
                          tcp_write_complete_cb) {
B
Brian Anderson 已提交
979
          0i32 => {
980
            log(debug, ~"uv_write() invoked successfully");
981 982
            uv::ll::set_data_for_req(write_req_ptr, write_data_ptr);
          }
B
Brian Anderson 已提交
983
          _ => {
984
            log(debug, ~"error invoking uv_write()");
985
            let err_data = uv::ll::get_last_err_data(loop_ptr);
E
Eric Holk 已提交
986
            core::comm::send((*write_data_ptr).result_ch,
987 988 989 990
                       tcp_write_error(err_data.to_tcp_err()));
          }
        }
    };
991 992 993 994
    // FIXME (#2656): Instead of passing unsafe pointers to local data,
    // and waiting here for the write to complete, we should transfer
    // ownership of everything to the I/O task and let it deal with the
    // aftermath, so we don't have to sit here blocking.
E
Eric Holk 已提交
995
    match core::comm::recv(result_po) {
B
Brian Anderson 已提交
996 997
      tcp_write_success => result::ok(()),
      tcp_write_error(err_data) => result::err(err_data.to_tcp_err())
998 999 1000
    }
}

1001 1002 1003 1004
enum tcp_new_connection {
    new_tcp_conn(*uv::ll::uv_tcp_t)
}

1005
type tcp_listen_fc_data = {
1006
    server_stream_ptr: *uv::ll::uv_tcp_t,
1007 1008
    stream_closed_ch: comm::Chan<()>,
    kill_ch: comm::Chan<option<tcp_err_data>>,
1009
    on_connect_cb: fn~(*uv::ll::uv_tcp_t),
1010
    iotask: iotask,
1011 1012 1013
    mut active: bool
};

G
Graydon Hoare 已提交
1014
extern fn tcp_lfc_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
1015
    let server_data_ptr = uv::ll::get_data_for_uv_handle(
1016
        handle) as *tcp_listen_fc_data;
E
Eric Holk 已提交
1017
    core::comm::send((*server_data_ptr).stream_closed_ch, ());
1018 1019
}

G
Graydon Hoare 已提交
1020
extern fn tcp_lfc_on_connection_cb(handle: *uv::ll::uv_tcp_t,
1021 1022
                                     status: libc::c_int) unsafe {
    let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
1023
        as *tcp_listen_fc_data;
1024
    let kill_ch = (*server_data_ptr).kill_ch;
1025
    if (*server_data_ptr).active {
1026
        match status {
B
Brian Anderson 已提交
1027 1028
          0i32 => (*server_data_ptr).on_connect_cb(handle),
          _ => {
1029
            let loop_ptr = uv::ll::get_loop_for_uv_handle(handle);
E
Eric Holk 已提交
1030
            core::comm::send(kill_ch,
1031 1032 1033 1034 1035 1036 1037 1038
                       some(uv::ll::get_last_err_data(loop_ptr)
                            .to_tcp_err()));
            (*server_data_ptr).active = false;
          }
        }
    }
}

1039
fn malloc_uv_tcp_t() -> *uv::ll::uv_tcp_t unsafe {
1040
    rustrt::rust_uv_current_kernel_malloc(
1041
        rustrt::rust_uv_helper_uv_tcp_t_size()) as *uv::ll::uv_tcp_t
1042 1043
}

1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054
enum tcp_connect_result {
    tcp_connected(tcp_socket),
    tcp_connect_error(tcp_err_data)
}

enum tcp_write_result {
    tcp_write_success,
    tcp_write_error(tcp_err_data)
}

enum tcp_read_start_result {
1055
    tcp_read_start_success(comm::Port<tcp_read_result>),
1056 1057 1058 1059
    tcp_read_start_error(tcp_err_data)
}

enum tcp_read_result {
1060
    tcp_read_data(~[u8]),
1061 1062 1063 1064
    tcp_read_done,
    tcp_read_err(tcp_err_data)
}

1065
trait to_tcp_err {
1066 1067 1068
    fn to_tcp_err() -> tcp_err_data;
}

B
Brian Anderson 已提交
1069
impl uv::ll::uv_err_data: to_tcp_err {
1070 1071 1072 1073 1074
    fn to_tcp_err() -> tcp_err_data {
        { err_name: self.err_name, err_msg: self.err_msg }
    }
}

G
Graydon Hoare 已提交
1075
extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
1076 1077
                    nread: libc::ssize_t,
                    ++buf: uv::ll::uv_buf_t) unsafe {
1078 1079
    log(debug, fmt!{"entering on_tcp_read_cb stream: %? nread: %?",
                    stream, nread});
1080 1081 1082
    let loop_ptr = uv::ll::get_loop_for_uv_handle(stream);
    let socket_data_ptr = uv::ll::get_data_for_uv_handle(stream)
        as *tcp_socket_data;
1083
    match nread as int {
1084
      // incoming err.. probably eof
B
Brian Anderson 已提交
1085
      -1 => {
1086
        let err_data = uv::ll::get_last_err_data(loop_ptr).to_tcp_err();
1087 1088
        log(debug, fmt!{"on_tcp_read_cb: incoming err.. name %? msg %?",
                        err_data.err_name, err_data.err_msg});
1089
        let reader_ch = (*socket_data_ptr).reader_ch;
E
Eric Holk 已提交
1090
        core::comm::send(reader_ch, result::err(err_data));
1091 1092
      }
      // do nothing .. unneeded buf
B
Brian Anderson 已提交
1093
      0 => (),
1094
      // have data
B
Brian Anderson 已提交
1095
      _ => {
1096
        // we have data
1097
        log(debug, fmt!{"tcp on_read_cb nread: %d", nread as int});
1098
        let reader_ch = (*socket_data_ptr).reader_ch;
1099
        let buf_base = uv::ll::get_base_from_buf(buf);
1100
        let new_bytes = vec::unsafe::from_buf(buf_base, nread as uint);
E
Eric Holk 已提交
1101
        core::comm::send(reader_ch, result::ok(new_bytes));
1102 1103 1104
      }
    }
    uv::ll::free_base_of_buf(buf);
1105
    log(debug, ~"exiting on_tcp_read_cb");
1106 1107
}

G
Graydon Hoare 已提交
1108
extern fn on_alloc_cb(handle: *libc::c_void,
1109
                     ++suggested_size: size_t)
1110
    -> uv::ll::uv_buf_t unsafe {
1111
    log(debug, ~"tcp read on_alloc_cb!");
1112
    let char_ptr = uv::ll::malloc_buf_base_of(suggested_size);
1113
    log(debug, fmt!{"tcp read on_alloc_cb h: %? char_ptr: %u sugsize: %u",
1114 1115
                     handle,
                     char_ptr as uint,
1116
                     suggested_size as uint});
1117
    uv::ll::buf_init(char_ptr, suggested_size as uint)
1118
}
1119 1120

type tcp_socket_close_data = {
1121
    closed_ch: comm::Chan<()>
1122 1123
};

G
Graydon Hoare 已提交
1124
extern fn tcp_socket_dtor_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
1125 1126 1127
    let data = uv::ll::get_data_for_uv_handle(handle)
        as *tcp_socket_close_data;
    let closed_ch = (*data).closed_ch;
E
Eric Holk 已提交
1128
    core::comm::send(closed_ch, ());
1129
    log(debug, ~"tcp_socket_dtor_close_cb exiting..");
1130 1131
}

G
Graydon Hoare 已提交
1132
extern fn tcp_write_complete_cb(write_req: *uv::ll::uv_write_t,
1133 1134 1135
                              status: libc::c_int) unsafe {
    let write_data_ptr = uv::ll::get_data_for_req(write_req)
        as *write_req_data;
1136
    if status == 0i32 {
1137
        log(debug, ~"successful write complete");
E
Eric Holk 已提交
1138
        core::comm::send((*write_data_ptr).result_ch, tcp_write_success);
1139
    } else {
1140 1141 1142 1143
        let stream_handle_ptr = uv::ll::get_stream_handle_from_write_req(
            write_req);
        let loop_ptr = uv::ll::get_loop_for_uv_handle(stream_handle_ptr);
        let err_data = uv::ll::get_last_err_data(loop_ptr);
1144
        log(debug, ~"failure to write");
E
Eric Holk 已提交
1145 1146
        core::comm::send((*write_data_ptr).result_ch,
                         tcp_write_error(err_data));
1147 1148 1149 1150
    }
}

type write_req_data = {
1151
    result_ch: comm::Chan<tcp_write_result>
1152 1153
};

J
Jeff Olson 已提交
1154
type connect_req_data = {
1155 1156
    result_ch: comm::Chan<conn_attempt>,
    closed_signal_ch: comm::Chan<()>
J
Jeff Olson 已提交
1157 1158
};

G
Graydon Hoare 已提交
1159
extern fn stream_error_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
J
Jeff Olson 已提交
1160 1161
    let data = uv::ll::get_data_for_uv_handle(handle) as
        *connect_req_data;
E
Eric Holk 已提交
1162
    core::comm::send((*data).closed_signal_ch, ());
1163
    log(debug, fmt!{"exiting steam_error_close_cb for %?", handle});
J
Jeff Olson 已提交
1164 1165
}

G
Graydon Hoare 已提交
1166
extern fn tcp_connect_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
1167
    log(debug, fmt!{"closed client tcp handle %?", handle});
J
Jeff Olson 已提交
1168 1169
}

G
Graydon Hoare 已提交
1170
extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t,
J
Jeff Olson 已提交
1171 1172 1173 1174
                                   status: libc::c_int) unsafe {
    let conn_data_ptr = (uv::ll::get_data_for_req(connect_req_ptr)
                      as *connect_req_data);
    let result_ch = (*conn_data_ptr).result_ch;
1175
    log(debug, fmt!{"tcp_connect result_ch %?", result_ch});
J
Jeff Olson 已提交
1176 1177
    let tcp_stream_ptr =
        uv::ll::get_stream_handle_from_connect_req(connect_req_ptr);
1178
    match status {
B
Brian Anderson 已提交
1179
      0i32 => {
1180
        log(debug, ~"successful tcp connection!");
E
Eric Holk 已提交
1181
        core::comm::send(result_ch, conn_success);
J
Jeff Olson 已提交
1182
      }
B
Brian Anderson 已提交
1183
      _ => {
1184
        log(debug, ~"error in tcp_connect_on_connect_cb");
J
Jeff Olson 已提交
1185 1186
        let loop_ptr = uv::ll::get_loop_for_uv_handle(tcp_stream_ptr);
        let err_data = uv::ll::get_last_err_data(loop_ptr);
1187 1188
        log(debug, fmt!{"err_data %? %?", err_data.err_name,
                        err_data.err_msg});
E
Eric Holk 已提交
1189
        core::comm::send(result_ch, conn_failure(err_data));
J
Jeff Olson 已提交
1190 1191 1192 1193 1194
        uv::ll::set_data_for_uv_handle(tcp_stream_ptr,
                                       conn_data_ptr);
        uv::ll::close(tcp_stream_ptr, stream_error_close_cb);
      }
    }
1195
    log(debug, ~"leaving tcp_connect_on_connect_cb");
J
Jeff Olson 已提交
1196 1197 1198 1199 1200 1201 1202 1203
}

enum conn_attempt {
    conn_success,
    conn_failure(uv::ll::uv_err_data)
}

type tcp_socket_data = {
1204 1205
    reader_po: comm::Port<result::result<~[u8], tcp_err_data>>,
    reader_ch: comm::Chan<result::result<~[u8], tcp_err_data>>,
1206
    stream_handle_ptr: *uv::ll::uv_tcp_t,
J
Jeff Olson 已提交
1207
    connect_req: uv::ll::uv_connect_t,
1208
    write_req: uv::ll::uv_write_t,
1209
    iotask: iotask
J
Jeff Olson 已提交
1210 1211
};

1212 1213
type tcp_buffered_socket_data = {
    sock: tcp_socket,
1214
    mut buf: ~[u8]
1215
};
J
Jeff Olson 已提交
1216

1217
//#[cfg(test)]
1218
mod test {
1219
    // FIXME don't run on fbsd or linux 32 bit (#2064)
1220 1221 1222 1223 1224 1225 1226 1227 1228 1229
    #[cfg(target_os="win32")]
    #[cfg(target_os="darwin")]
    #[cfg(target_os="linux")]
    mod tcp_ipv4_server_and_client_test {
        #[cfg(target_arch="x86_64")]
        mod impl64 {
            #[test]
            fn test_gl_tcp_server_and_client_ipv4() unsafe {
                impl_gl_tcp_ipv4_server_and_client();
            }
1230
            #[test]
1231 1232 1233
            fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
                impl_gl_tcp_ipv4_client_error_connection_refused();
            }
1234 1235 1236 1237 1238 1239 1240
            #[test]
            fn test_gl_tcp_server_address_in_use() unsafe {
                impl_gl_tcp_ipv4_server_address_in_use();
            }
            #[test]
            fn test_gl_tcp_server_access_denied() unsafe {
                impl_gl_tcp_ipv4_server_access_denied();
1241
            }
1242 1243 1244 1245
            #[test]
            fn test_gl_tcp_ipv4_server_client_reader_writer() {
                impl_gl_tcp_ipv4_server_client_reader_writer();
            }
1246

1247 1248 1249 1250 1251 1252 1253 1254
        }
        #[cfg(target_arch="x86")]
        mod impl32 {
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_server_and_client_ipv4() unsafe {
                impl_gl_tcp_ipv4_server_and_client();
            }
1255 1256
            #[test]
            #[ignore(cfg(target_os = "linux"))]
1257 1258 1259
            fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
                impl_gl_tcp_ipv4_client_error_connection_refused();
            }
1260 1261 1262 1263 1264 1265 1266
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_server_address_in_use() unsafe {
                impl_gl_tcp_ipv4_server_address_in_use();
            }
            #[test]
            #[ignore(cfg(target_os = "linux"))]
1267
            #[ignore(cfg(windows), reason = "deadlocking bots")]
1268 1269 1270
            fn test_gl_tcp_server_access_denied() unsafe {
                impl_gl_tcp_ipv4_server_access_denied();
            }
1271 1272 1273 1274
            #[test]
            #[ignore(cfg(target_os = "linux"))]
            fn test_gl_tcp_ipv4_server_client_reader_writer() {
                impl_gl_tcp_ipv4_server_client_reader_writer();
1275
            }
1276
        }
1277
    }
1278
    fn impl_gl_tcp_ipv4_server_and_client() {
1279
        let hl_loop = uv::global_loop::get();
1280
        let server_ip = ~"127.0.0.1";
1281
        let server_port = 8888u;
1282 1283
        let expected_req = ~"ping";
        let expected_resp = ~"pong";
1284

E
Eric Holk 已提交
1285 1286
        let server_result_po = core::comm::port::<~str>();
        let server_result_ch = core::comm::chan(server_result_po);
1287

E
Eric Holk 已提交
1288 1289
        let cont_po = core::comm::port::<()>();
        let cont_ch = core::comm::chan(cont_po);
1290
        // server
1291
        do task::spawn_sched(task::ManualThreads(1u)) {
B
Brian Anderson 已提交
1292
            let actual_req = do comm::listen |server_ch| {
1293 1294 1295 1296
                run_tcp_test_server(
                    server_ip,
                    server_port,
                    expected_resp,
1297
                    server_ch,
1298 1299
                    cont_ch,
                    hl_loop)
1300 1301 1302
            };
            server_result_ch.send(actual_req);
        };
E
Eric Holk 已提交
1303
        core::comm::recv(cont_po);
1304
        // client
1305
        log(debug, ~"server started, firing up client..");
E
Eric Holk 已提交
1306
        let actual_resp_result = do core::comm::listen |client_ch| {
1307 1308 1309 1310
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
1311 1312
                client_ch,
                hl_loop)
1313
        };
1314
        assert actual_resp_result.is_ok();
1315
        let actual_resp = actual_resp_result.get();
E
Eric Holk 已提交
1316
        let actual_req = core::comm::recv(server_result_po);
1317 1318 1319 1320
        log(debug, fmt!{"REQ: expected: '%s' actual: '%s'",
                       expected_req, actual_req});
        log(debug, fmt!{"RESP: expected: '%s' actual: '%s'",
                       expected_resp, actual_resp});
1321 1322
        assert str::contains(actual_req, expected_req);
        assert str::contains(actual_resp, expected_resp);
1323
    }
1324
    fn impl_gl_tcp_ipv4_client_error_connection_refused() {
1325
        let hl_loop = uv::global_loop::get();
1326
        let server_ip = ~"127.0.0.1";
1327
        let server_port = 8889u;
1328
        let expected_req = ~"ping";
1329
        // client
1330
        log(debug, ~"firing up client..");
E
Eric Holk 已提交
1331
        let actual_resp_result = do core::comm::listen |client_ch| {
1332 1333 1334 1335 1336 1337 1338
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
                client_ch,
                hl_loop)
        };
1339
        match actual_resp_result.get_err() {
B
Brian Anderson 已提交
1340 1341
          connection_refused => (),
          _ => fail ~"unknown error.. expected connection_refused"
1342 1343
        }
    }
1344 1345
    fn impl_gl_tcp_ipv4_server_address_in_use() {
        let hl_loop = uv::global_loop::get();
1346
        let server_ip = ~"127.0.0.1";
1347
        let server_port = 8890u;
1348 1349
        let expected_req = ~"ping";
        let expected_resp = ~"pong";
1350

E
Eric Holk 已提交
1351 1352
        let server_result_po = core::comm::port::<~str>();
        let server_result_ch = core::comm::chan(server_result_po);
1353

E
Eric Holk 已提交
1354 1355
        let cont_po = core::comm::port::<()>();
        let cont_ch = core::comm::chan(cont_po);
1356
        // server
1357
        do task::spawn_sched(task::ManualThreads(1u)) {
B
Brian Anderson 已提交
1358
            let actual_req = do comm::listen |server_ch| {
1359
                run_tcp_test_server(
1360 1361 1362 1363
                    server_ip,
                    server_port,
                    expected_resp,
                    server_ch,
1364 1365
                    cont_ch,
                    hl_loop)
1366 1367 1368
            };
            server_result_ch.send(actual_req);
        };
E
Eric Holk 已提交
1369
        core::comm::recv(cont_po);
1370 1371 1372 1373 1374 1375
        // this one should fail..
        let listen_err = run_tcp_test_server_fail(
                            server_ip,
                            server_port,
                            hl_loop);
        // client.. just doing this so that the first server tears down
1376
        log(debug, ~"server started, firing up client..");
E
Eric Holk 已提交
1377
        do core::comm::listen |client_ch| {
1378 1379 1380 1381
            run_tcp_test_client(
                server_ip,
                server_port,
                expected_req,
1382 1383
                client_ch,
                hl_loop)
1384
        };
1385
        match listen_err {
B
Brian Anderson 已提交
1386
          address_in_use => {
1387 1388
            assert true;
          }
B
Brian Anderson 已提交
1389
          _ => {
1390
            fail ~"expected address_in_use listen error,"+
B
Brian Anderson 已提交
1391
                ~"but got a different error varient. check logs.";
1392 1393 1394 1395 1396
          }
        }
    }
    fn impl_gl_tcp_ipv4_server_access_denied() {
        let hl_loop = uv::global_loop::get();
1397
        let server_ip = ~"127.0.0.1";
1398 1399 1400 1401 1402 1403
        let server_port = 80u;
        // this one should fail..
        let listen_err = run_tcp_test_server_fail(
                            server_ip,
                            server_port,
                            hl_loop);
1404
        match listen_err {
B
Brian Anderson 已提交
1405
          access_denied => {
1406 1407
            assert true;
          }
B
Brian Anderson 已提交
1408
          _ => {
1409 1410
            fail ~"expected address_in_use listen error,"+
                      ~"but got a different error varient. check logs.";
1411 1412 1413
          }
        }
    }
1414 1415
    fn impl_gl_tcp_ipv4_server_client_reader_writer() {
        let iotask = uv::global_loop::get();
1416
        let server_ip = ~"127.0.0.1";
1417
        let server_port = 8891u;
1418 1419
        let expected_req = ~"ping";
        let expected_resp = ~"pong";
1420

E
Eric Holk 已提交
1421 1422
        let server_result_po = core::comm::port::<~str>();
        let server_result_ch = core::comm::chan(server_result_po);
1423

E
Eric Holk 已提交
1424 1425
        let cont_po = core::comm::port::<()>();
        let cont_ch = core::comm::chan(cont_po);
1426
        // server
1427
        do task::spawn_sched(task::ManualThreads(1u)) {
1428
            let actual_req = do comm::listen |server_ch| {
1429 1430 1431 1432 1433 1434 1435 1436 1437 1438
                run_tcp_test_server(
                    server_ip,
                    server_port,
                    expected_resp,
                    server_ch,
                    cont_ch,
                    iotask)
            };
            server_result_ch.send(actual_req);
        };
E
Eric Holk 已提交
1439
        core::comm::recv(cont_po);
1440 1441 1442
        // client
        let server_addr = ip::v4::parse_addr(server_ip);
        let conn_result = connect(server_addr, server_port, iotask);
1443
        if result::is_err(conn_result) {
1444 1445 1446
            assert false;
        }
        let sock_buf = @socket_buf(result::unwrap(conn_result));
1447
        buf_write(sock_buf as io::Writer, expected_req);
1448 1449

        // so contrived!
1450
        let actual_resp = do str::as_bytes(expected_resp) |resp_buf| {
1451
            buf_read(sock_buf as io::Reader,
1452 1453
                     vec::len(resp_buf))
        };
1454

E
Eric Holk 已提交
1455
        let actual_req = core::comm::recv(server_result_po);
1456 1457 1458 1459
        log(debug, fmt!{"REQ: expected: '%s' actual: '%s'",
                       expected_req, actual_req});
        log(debug, fmt!{"RESP: expected: '%s' actual: '%s'",
                       expected_resp, actual_resp});
1460 1461 1462
        assert str::contains(actual_req, expected_req);
        assert str::contains(actual_resp, expected_resp);
    }
1463

1464
    fn buf_write(+w: io::Writer, val: ~str) {
1465
        log(debug, fmt!{"BUF_WRITE: val len %?", str::len(val)});
1466
        do str::byte_slice(val) |b_slice| {
1467 1468
            log(debug, fmt!{"BUF_WRITE: b_slice len %?",
                            vec::len(b_slice)});
1469
            w.write(b_slice)
1470 1471 1472
        }
    }

1473
    fn buf_read(+r: io::Reader, len: uint) -> ~str {
1474
        let new_bytes = r.read_bytes(len);
1475 1476
        log(debug, fmt!{"in buf_read.. new_bytes len: %?",
                        vec::len(new_bytes)});
1477 1478
        str::from_bytes(new_bytes)
    }
1479

1480
    fn run_tcp_test_server(server_ip: ~str, server_port: uint, resp: ~str,
1481 1482
                          server_ch: comm::Chan<~str>,
                          cont_ch: comm::Chan<()>,
1483
                          iotask: iotask) -> ~str {
1484 1485 1486
        let server_ip_addr = ip::v4::parse_addr(server_ip);
        let listen_result = listen(server_ip_addr, server_port, 128u, iotask,
            // on_establish_cb -- called when listener is set up
1487
            |kill_ch| {
1488 1489
                log(debug, fmt!{"establish_cb %?",
                    kill_ch});
E
Eric Holk 已提交
1490
                core::comm::send(cont_ch, ());
1491 1492 1493
            },
            // risky to run this on the loop, but some users
            // will want the POWER
1494
            |new_conn, kill_ch| {
1495
            log(debug, ~"SERVER: new connection!");
1496
            do comm::listen |cont_ch| {
1497
                do task::spawn_sched(task::ManualThreads(1u)) {
1498
                    log(debug, ~"SERVER: starting worker for new req");
1499 1500

                    let accept_result = accept(new_conn);
1501
                    log(debug, ~"SERVER: after accept()");
1502
                    if result::is_err(accept_result) {
1503
                        log(debug, ~"SERVER: error accept connection");
1504
                        let err_data = result::get_err(accept_result);
E
Eric Holk 已提交
1505
                        core::comm::send(kill_ch, some(err_data));
1506
                        log(debug,
1507
                            ~"SERVER/WORKER: send on err cont ch");
1508 1509 1510 1511
                        cont_ch.send(());
                    }
                    else {
                        log(debug,
1512
                            ~"SERVER/WORKER: send on cont ch");
1513 1514
                        cont_ch.send(());
                        let sock = result::unwrap(accept_result);
1515 1516
                        log(debug, ~"SERVER: successfully accepted"+
                            ~"connection!");
1517
                        let received_req_bytes = read(sock, 0u);
1518
                        match received_req_bytes {
B
Brian Anderson 已提交
1519
                          result::ok(data) => {
1520
                            log(debug, ~"SERVER: got REQ str::from_bytes..");
1521 1522
                            log(debug, fmt!{"SERVER: REQ data len: %?",
                                            vec::len(data)});
1523 1524
                            server_ch.send(
                                str::from_bytes(data));
1525
                            log(debug, ~"SERVER: before write");
1526
                            tcp_write_single(sock, str::bytes(resp));
1527
                            log(debug, ~"SERVER: after write.. die");
E
Eric Holk 已提交
1528
                            core::comm::send(kill_ch, none);
1529
                          }
B
Brian Anderson 已提交
1530
                          result::err(err_data) => {
1531 1532
                            log(debug, fmt!{"SERVER: error recvd: %s %s",
                                err_data.err_name, err_data.err_msg});
E
Eric Holk 已提交
1533
                            core::comm::send(kill_ch, some(err_data));
1534
                            server_ch.send(~"");
1535
                          }
1536
                        }
1537
                        log(debug, ~"SERVER: worker spinning down");
1538
                    }
1539
                }
1540
                log(debug, ~"SERVER: waiting to recv on cont_ch");
1541 1542
                cont_ch.recv()
            };
1543
            log(debug, ~"SERVER: recv'd on cont_ch..leaving listen cb");
1544 1545 1546
        });
        // err check on listen_result
        if result::is_err(listen_result) {
1547
            match result::get_err(listen_result) {
B
Brian Anderson 已提交
1548
              generic_listen_err(name, msg) => {
1549 1550
                fail fmt!{"SERVER: exited abnormally name %s msg %s",
                                name, msg};
1551
              }
B
Brian Anderson 已提交
1552
              access_denied => {
1553
                fail ~"SERVER: exited abnormally, got access denied..";
1554
              }
B
Brian Anderson 已提交
1555
              address_in_use => {
1556
                fail ~"SERVER: exited abnormally, got address in use...";
1557
              }
1558
            }
1559
        }
1560
        let ret_val = server_ch.recv();
B
Brian Anderson 已提交
1561
        log(debug, fmt!{"SERVER: exited and got return val: '%s'", ret_val});
1562 1563 1564
        ret_val
    }

1565
    fn run_tcp_test_server_fail(server_ip: ~str, server_port: uint,
1566 1567 1568 1569
                          iotask: iotask) -> tcp_listen_err_data {
        let server_ip_addr = ip::v4::parse_addr(server_ip);
        let listen_result = listen(server_ip_addr, server_port, 128u, iotask,
            // on_establish_cb -- called when listener is set up
1570
            |kill_ch| {
1571 1572
                log(debug, fmt!{"establish_cb %?",
                    kill_ch});
1573
            },
1574
            |new_conn, kill_ch| {
1575 1576
                fail fmt!{"SERVER: shouldn't be called.. %? %?",
                           new_conn, kill_ch};
1577 1578
        });
        // err check on listen_result
1579
        if result::is_err(listen_result) {
1580 1581 1582
            result::get_err(listen_result)
        }
        else {
1583
            fail ~"SERVER: did not fail as expected"
1584 1585 1586
        }
    }

1587
    fn run_tcp_test_client(server_ip: ~str, server_port: uint, resp: ~str,
1588
                          client_ch: comm::Chan<~str>,
1589
                          iotask: iotask) -> result::result<~str,
1590
                                                    tcp_connect_err_data> {
1591
        let server_ip_addr = ip::v4::parse_addr(server_ip);
1592

1593
        log(debug, ~"CLIENT: starting..");
1594
        let connect_result = connect(server_ip_addr, server_port, iotask);
1595
        if result::is_err(connect_result) {
1596
            log(debug, ~"CLIENT: failed to connect");
1597
            let err_data = result::get_err(connect_result);
1598
            err(err_data)
1599
        }
1600 1601 1602 1603
        else {
            let sock = result::unwrap(connect_result);
            let resp_bytes = str::bytes(resp);
            tcp_write_single(sock, resp_bytes);
1604
            let read_result = sock.read(0u);
1605
            if read_result.is_err() {
1606 1607
                log(debug, ~"CLIENT: failure to read");
                ok(~"")
1608
            }
1609 1610 1611
            else {
                client_ch.send(str::from_bytes(read_result.get()));
                let ret_val = client_ch.recv();
1612 1613
                log(debug, fmt!{"CLIENT: after client_ch recv ret: '%s'",
                   ret_val});
1614
                ok(ret_val)
1615 1616
            }
        }
1617 1618
    }

1619
    fn tcp_write_single(sock: tcp_socket, val: ~[u8]) {
1620 1621
        let write_result_future = sock.write_future(val);
        let write_result = write_result_future.get();
1622
        if result::is_err(write_result) {
1623
            log(debug, ~"tcp_write_single: write failed!");
1624
            let err_data = result::get_err(write_result);
1625 1626
            log(debug, fmt!{"tcp_write_single err name: %s msg: %s",
                err_data.err_name, err_data.err_msg});
1627
            // meh. torn on what to do here.
1628
            fail ~"tcp_write_single failed";
1629
        }
1630
    }
1631
}