net_tcp.rs 17.7 KB
Newer Older
1 2 3 4
#[doc="
High-level interface to libuv's TCP functionality
"];

J
Jeff Olson 已提交
5 6
import ip = net_ip;

7
export tcp_err_data, tcp_connect_result, tcp_write_result, tcp_read_start_result;
8
export connect, write;
J
Jeff Olson 已提交
9

10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
resource tcp_socket(socket_data: @tcp_socket_data) unsafe {
    let closed_po = comm::port::<()>();
    let closed_ch = comm::chan(closed_po);
    let close_data = {
        closed_ch: closed_ch
    };
    let close_data_ptr = ptr::addr_of(close_data);
    let stream_handle_ptr = ptr::addr_of((*socket_data).stream_handle);
    uv::hl::interact((*socket_data).hl_loop) {|loop_ptr|
        log(debug, #fmt("interact dtor for tcp_socket stream %? loop %?",
            stream_handle_ptr, loop_ptr));
        uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                       close_data_ptr);
        uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb);
    };
    comm::recv(closed_po);
    log(debug, "exiting dtor for tcp_socket");
J
Jeff Olson 已提交
27 28
}

29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
type tcp_err_data = {
    err_name: str,
    err_msg: str
};

iface to_tcp_err_iface {
    fn to_tcp_err() -> tcp_err_data;
}

impl of to_tcp_err_iface for uv::ll::uv_err_data {
    fn to_tcp_err() -> tcp_err_data {
        { err_name: self.err_name, err_msg: self.err_msg }
    }
}

J
Jeff Olson 已提交
44 45
enum tcp_connect_result {
    tcp_connected(tcp_socket),
46
    tcp_connect_error(tcp_err_data)
J
Jeff Olson 已提交
47 48
}

49 50
enum tcp_write_result {
    tcp_write_success,
51
    tcp_write_error(tcp_err_data)
52 53
}

54 55
enum tcp_read_start_result {
    tcp_read_start_success(comm::port<tcp_read_result>),
56
    tcp_read_start_error(tcp_err_data)
57 58 59 60 61
}

enum tcp_read_result {
    tcp_read_data([u8]),
    tcp_read_done,
62
    tcp_read_err(tcp_err_data)
63 64
}

J
Jeff Olson 已提交
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
#[doc="
Initiate a client connection over TCP/IP

# Arguments

* ip - The IP address (versions 4 or 6) of the remote host
* port - the unsigned integer of the desired remote host port

# Returns

A `tcp_connect_result` that can be used to determine the connection and,
if successful, send and receive data to/from the remote host
"]
fn connect(input_ip: ip::ip_addr, port: uint) -> tcp_connect_result unsafe {
    let result_po = comm::port::<conn_attempt>();
    let closed_signal_po = comm::port::<()>();
    let conn_data = {
        result_ch: comm::chan(result_po),
        closed_signal_ch: comm::chan(closed_signal_po)
    };
    let conn_data_ptr = ptr::addr_of(conn_data);
86
    let hl_loop = uv::global_loop::get();
87
    let reader_po = comm::port::<tcp_read_result>();
J
Jeff Olson 已提交
88
    let socket_data = @{
89 90
        reader_po: reader_po,
        reader_ch: comm::chan(reader_po),
J
Jeff Olson 已提交
91 92
        stream_handle : uv::ll::tcp_t(),
        connect_req : uv::ll::connect_t(),
93 94
        write_req : uv::ll::write_t(),
        hl_loop: hl_loop
J
Jeff Olson 已提交
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
    };
    log(debug, #fmt("tcp_connect result_ch %?", conn_data.result_ch));
    // get an unsafe representation of our stream_handle_ptr that
    // we can send into the interact cb to be handled in libuv..
    let socket_data_ptr: *tcp_socket_data =
        ptr::addr_of(*socket_data);
    log(debug, #fmt("stream_handl_ptr outside interact %?",
        ptr::addr_of((*socket_data_ptr).stream_handle)));
    uv::hl::interact(hl_loop) {|loop_ptr|
        log(debug, "in interact cb for tcp client connect..");
        let stream_handle_ptr =
            ptr::addr_of((*socket_data_ptr).stream_handle);
        log(debug, #fmt("stream_handl_ptr in interact %?",
            stream_handle_ptr));
        alt uv::ll::tcp_init( loop_ptr, stream_handle_ptr) {
          0i32 {
            log(debug, "tcp_init successful");
            alt input_ip {
              ipv4 {
                log(debug, "dealing w/ ipv4 connection..");
                let tcp_addr = ipv4_ip_addr_to_sockaddr_in(input_ip,
                                                           port);
                let tcp_addr_ptr = ptr::addr_of(tcp_addr);
                let connect_req_ptr =
                    ptr::addr_of((*socket_data_ptr).connect_req);
                alt uv::ll::tcp_connect(
                    connect_req_ptr,
                    stream_handle_ptr,
                    tcp_addr_ptr,
                    tcp_connect_on_connect_cb) {
                  0i32 {
                    log(debug, "tcp_connect successful");
                    // reusable data that we'll have for the
                    // duration..
                    uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                               socket_data_ptr);
                    // just so the connect_cb can send the
                    // outcome..
                    uv::ll::set_data_for_req(connect_req_ptr,
                                             conn_data_ptr);
                    log(debug, "leaving tcp_connect interact cb...");
                    // let tcp_connect_on_connect_cb send on
                    // the result_ch, now..
                  }
                  _ {
                    // immediate connect failure.. probably a garbage
                    // ip or somesuch
                    let err_data = uv::ll::get_last_err_data(loop_ptr);
                    comm::send((*conn_data_ptr).result_ch,
144
                               conn_failure(err_data.to_tcp_err()));
J
Jeff Olson 已提交
145 146 147 148 149 150 151 152 153 154 155 156
                    uv::ll::set_data_for_uv_handle(stream_handle_ptr,
                                                   conn_data_ptr);
                    uv::ll::close(stream_handle_ptr, stream_error_close_cb);
                  }
                }
              }
            }
        }
          _ {
            // failure to create a tcp handle
            let err_data = uv::ll::get_last_err_data(loop_ptr);
            comm::send((*conn_data_ptr).result_ch,
157
                       conn_failure(err_data.to_tcp_err()));
J
Jeff Olson 已提交
158 159 160 161 162 163
          }
        }
    };
    alt comm::recv(result_po) {
      conn_success {
        log(debug, "tcp::connect - received success on result_po");
164
        tcp_connected(tcp_socket(socket_data))
J
Jeff Olson 已提交
165 166 167 168
      }
      conn_failure(err_data) {
        comm::recv(closed_signal_po);
        log(debug, "tcp::connect - received failure on result_po");
169
        tcp_connect_error(err_data.to_tcp_err())
J
Jeff Olson 已提交
170 171 172
      }
    }
}
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206

#[doc="
Write binary data to a tcp stream
"]
fn write(sock: tcp_socket, raw_write_data: [[u8]]) -> tcp_write_result
    unsafe {
    let socket_data_ptr = ptr::addr_of(**sock);
    let write_req_ptr = ptr::addr_of((*socket_data_ptr).write_req);
    let stream_handle_ptr =
        ptr::addr_of((*socket_data_ptr).stream_handle);
    let write_buf_vec = iter::map_to_vec(raw_write_data) {|raw_bytes|
        uv::ll::buf_init(vec::unsafe::to_ptr(raw_bytes),
                         vec::len(raw_bytes))
    };
    let write_buf_vec_ptr = ptr::addr_of(write_buf_vec);
    let result_po = comm::port::<tcp_write_result>();
    let write_data = {
        result_ch: comm::chan(result_po)
    };
    let write_data_ptr = ptr::addr_of(write_data);
    uv::hl::interact((*socket_data_ptr).hl_loop) {|loop_ptr|
        log(debug, #fmt("in interact cb for tcp::write %?", loop_ptr));
        alt uv::ll::write(write_req_ptr,
                          stream_handle_ptr,
                          write_buf_vec_ptr,
                          tcp_write_complete_cb) {
          0i32 {
            log(debug, "uv_write() invoked successfully");
            uv::ll::set_data_for_req(write_req_ptr, write_data_ptr);
          }
          _ {
            log(debug, "error invoking uv_write()");
            let err_data = uv::ll::get_last_err_data(loop_ptr);
            comm::send((*write_data_ptr).result_ch,
207
                       tcp_write_error(err_data.to_tcp_err()));
208 209 210 211 212 213
          }
        }
    };
    comm::recv(result_po)
}

214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
#[doc="
"]
fn read_start(sock: tcp_socket) -> tcp_read_start_result unsafe {
    let stream_handle_ptr = ptr::addr_of((**sock).stream_handle);
    let start_po = comm::port::<option<uv::ll::uv_err_data>>();
    let start_ch = comm::chan(start_po);
    uv::hl::interact((**sock).hl_loop) {|loop_ptr|
        log(debug, #fmt("in tcp::read_start interact cb %?", loop_ptr));
        alt uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
                               on_alloc_cb,
                               on_tcp_read_cb) {
          0i32 {
            log(debug, "success doing uv_read_start");
            comm::send(start_ch, none);
          }
          _ {
            log(debug, "error attempting uv_read_start");
            let err_data = uv::ll::get_last_err_data(loop_ptr);
            comm::send(start_ch, some(err_data));
          }
        }
    };
    alt comm::recv(start_po) {
      some(err_data) {
238
        tcp_read_start_error(err_data.to_tcp_err())
239 240 241 242 243 244
      }
      none {
        tcp_read_start_success((**sock).reader_po)
      }
    }
}
245

246 247
fn read_stop(sock: tcp_socket) -> option<uv::ll::uv_err_data> unsafe {
    let stream_handle_ptr = ptr::addr_of((**sock).stream_handle);
248
    let stop_po = comm::port::<option<tcp_err_data>>();
249 250 251 252 253 254 255 256 257 258 259
    let stop_ch = comm::chan(stop_po);
    uv::hl::interact((**sock).hl_loop) {|loop_ptr|
        log(debug, "in interact cb for tcp::read_stop");
        alt uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
          0i32 {
            log(debug, "successfully called uv_read_stop");
            comm::send(stop_ch, none);
          }
          _ {
            log(debug, "failure in calling uv_read_stop");
            let err_data = uv::ll::get_last_err_data(loop_ptr);
260
            comm::send(stop_ch, some(err_data.to_tcp_err()));
261 262 263 264 265
          }
        }
    };
    comm::recv(stop_po)
}
266

J
Jeff Olson 已提交
267
// INTERNAL API
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
crust fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
                    nread: libc::ssize_t,
                    ++buf: uv::ll::uv_buf_t) unsafe {
    let loop_ptr = uv::ll::get_loop_for_uv_handle(stream);
    let socket_data_ptr = uv::ll::get_data_for_uv_handle(stream)
        as *tcp_socket_data;
    let reader_ch = (*socket_data_ptr).reader_ch;
    alt nread {
      // incoming err.. probably eof
      -1 {
        let err_data = uv::ll::get_last_err_data(loop_ptr);
        comm::send(reader_ch, tcp_read_err(err_data));
      }
      // do nothing .. unneeded buf
      0 {}
      // have data
      _ {
        // we have data
        log(debug, #fmt("tcp on_read_cb nread: %d", nread));
        let buf_base = uv::ll::get_base_from_buf(buf);
        let buf_len = uv::ll::get_len_from_buf(buf);
        let new_bytes = vec::unsafe::from_buf(buf_base, buf_len);
        comm::send(reader_ch, tcp_read_data(new_bytes));
      }
    }
    uv::ll::free_base_of_buf(buf);
}

crust fn on_alloc_cb(handle: *libc::c_void,
                     ++suggested_size: libc::size_t)
    -> uv::ll::uv_buf_t unsafe {
    log(debug, "tcp read on_alloc_cb!");
    let char_ptr = uv::ll::malloc_buf_base_of(suggested_size);
    log(debug, #fmt("tcp read on_alloc_cb h: %? char_ptr: %u sugsize: %u",
                     handle,
                     char_ptr as uint,
                     suggested_size as uint));
    uv::ll::buf_init(char_ptr, suggested_size)
}
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343

type tcp_socket_close_data = {
    closed_ch: comm::chan<()>
};

crust fn tcp_socket_dtor_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
    let data = uv::ll::get_data_for_uv_handle(handle)
        as *tcp_socket_close_data;
    let closed_ch = (*data).closed_ch;
    comm::send(closed_ch, ());
    log(debug, "tcp_socket_dtor_close_cb exiting..");
}

crust fn tcp_write_complete_cb(write_req: *uv::ll::uv_write_t,
                              status: libc::c_int) unsafe {
    let write_data_ptr = uv::ll::get_data_for_req(write_req)
        as *write_req_data;
    alt status {
      0i32 {
        log(debug, "successful write complete");
        comm::send((*write_data_ptr).result_ch, tcp_write_success);
      }
      _ {
        let stream_handle_ptr = uv::ll::get_stream_handle_from_write_req(
            write_req);
        let loop_ptr = uv::ll::get_loop_for_uv_handle(stream_handle_ptr);
        let err_data = uv::ll::get_last_err_data(loop_ptr);
        log(debug, "failure to write");
        comm::send((*write_data_ptr).result_ch, tcp_write_error(err_data));
      }
    }
}

type write_req_data = {
    result_ch: comm::chan<tcp_write_result>
};

J
Jeff Olson 已提交
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393
type connect_req_data = {
    result_ch: comm::chan<conn_attempt>,
    closed_signal_ch: comm::chan<()>
};

crust fn stream_error_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
    let data = uv::ll::get_data_for_uv_handle(handle) as
        *connect_req_data;
    comm::send((*data).closed_signal_ch, ());
    log(debug, #fmt("exiting steam_error_close_cb for %?", handle));
}

crust fn tcp_connect_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
    log(debug, #fmt("closed client tcp handle %?", handle));
}

crust fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t,
                                   status: libc::c_int) unsafe {
    let conn_data_ptr = (uv::ll::get_data_for_req(connect_req_ptr)
                      as *connect_req_data);
    let result_ch = (*conn_data_ptr).result_ch;
    log(debug, #fmt("tcp_connect result_ch %?", result_ch));
    let tcp_stream_ptr =
        uv::ll::get_stream_handle_from_connect_req(connect_req_ptr);
    alt status {
      0i32 {
        log(debug, "successful tcp connection!");
        comm::send(result_ch, conn_success);
      }
      _ {
        log(debug, "error in tcp_connect_on_connect_cb");
        let loop_ptr = uv::ll::get_loop_for_uv_handle(tcp_stream_ptr);
        let err_data = uv::ll::get_last_err_data(loop_ptr);
        log(debug, #fmt("err_data %? %?", err_data.err_name,
                        err_data.err_msg));
        comm::send(result_ch, conn_failure(err_data));
        uv::ll::set_data_for_uv_handle(tcp_stream_ptr,
                                       conn_data_ptr);
        uv::ll::close(tcp_stream_ptr, stream_error_close_cb);
      }
    }
    log(debug, "leaving tcp_connect_on_connect_cb");
}

enum conn_attempt {
    conn_success,
    conn_failure(uv::ll::uv_err_data)
}

type tcp_socket_data = {
394 395
    reader_po: comm::port<tcp_read_result>,
    reader_ch: comm::chan<tcp_read_result>,
J
Jeff Olson 已提交
396 397
    stream_handle: uv::ll::uv_tcp_t,
    connect_req: uv::ll::uv_connect_t,
398 399
    write_req: uv::ll::uv_write_t,
    hl_loop: uv::hl::high_level_loop
J
Jeff Olson 已提交
400 401 402 403 404 405 406 407 408
};

// convert rust ip_addr to libuv's native representation
fn ipv4_ip_addr_to_sockaddr_in(input: ip::ip_addr,
                               port: uint) -> uv::ll::sockaddr_in unsafe {
    uv::ll::ip4_addr(ip::format_addr(input), port as int)
}

#[cfg(test)]
409 410 411
mod test {
    #[test]
    fn test_gl_tcp_ipv4_request() {
412
        let ip_str = "173.194.79.99";
413 414
        let port = 80u;
        let expected_read_msg = "foo";
415
        let actual_write_msg = "GET / HTTP/1.1\r\n\r\n";
J
Jeff Olson 已提交
416
        let host_ip = ip::v4::parse_addr(ip_str);
417 418 419 420

        let data_po = comm::port::<[u8]>();
        let data_ch = comm::chan(data_po);
        
J
Jeff Olson 已提交
421 422 423
        alt connect(host_ip, port) {
          tcp_connected(sock) {
            log(debug, "successful tcp connect");
424 425 426 427 428
            let mut write_data: [[u8]] = [];
            let write_data = [str::as_bytes(actual_write_msg) {|str_bytes|
                str_bytes
            }];
            alt write(sock, write_data) {
429
              tcp_write_success {
430
                log(debug, "tcp::write successful");
J
Jeff Olson 已提交
431
                let mut total_read_data: [u8] = [];
432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465
                alt read_start(sock) {
                  tcp_read_start_success(reader_po) {
                    loop {
                        alt comm::recv(reader_po) {
                          tcp_read_data(new_data) {
                            total_read_data += new_data;
                            // theoretically, we could keep iterating, if
                            // we expect the server on the other end to keep
                            // streaming/chunking data to us, but..
                            alt read_stop(sock) {
                              some(err_data) {
                                log(debug, "error while calling read_stop");
                                log(debug, #fmt("read_stop error: %? %?",
                                                err_data.err_name,
                                                err_data.err_msg));
                                assert false;
                              }
                              none {
                                // exiting the read loop
                                break;
                              }
                            }
                          }
                          tcp_read_done {
                            break;
                          }
                          tcp_read_err(err_data) {
                            log(debug, "read error data recv'd");
                            log(debug, #fmt("read error: %? %?",
                                            err_data.err_name,
                                            err_data.err_msg));
                            assert false;
                          }
                        }
466
                    }
467 468 469 470 471 472 473 474 475
                    comm::send(data_ch, total_read_data);
                  }
                  tcp_read_start_error(err_data) {
                    log(debug, "tcp_read_start_error received..");
                    log(debug, #fmt("tcp read_start error: %? %?",
                                    err_data.err_name,
                                   err_data.err_msg));
                    assert false;
                  }
476 477
                }
              }
478 479 480 481 482
              tcp_write_error(err_data) {
                log(debug, "tcp_write_error received..");
                log(debug, #fmt("tcp write error: %? %?", err_data.err_name,
                               err_data.err_msg));
                assert false;
483 484 485
              }
            }
          }
J
Jeff Olson 已提交
486 487 488 489 490
          tcp_connect_error(err_data) {
            log(debug, "tcp_connect_error received..");
            log(debug, #fmt("tcp connect error: %? %?", err_data.err_name,
                           err_data.err_msg));
            assert false;
491 492 493 494 495 496 497 498
          }
        }

        let actual_data = comm::recv(data_po);
        let resp = str::from_bytes(actual_data);
        log(debug, "DATA RECEIVED: "+resp);
    }
}