提交 0367afb6 编写于 作者: L liucc1997 提交者: ob-robot

rpc: add userspace socket send data stuck check

上级 bca5c691
......@@ -655,6 +655,7 @@ GLOBAL_ERRSIM_POINT_DEF(2551, EN_SPI_GET_NEXT_ROW, "Used to check cursor fetch")
// RPC framework 2601 - 2620
GLOBAL_ERRSIM_POINT_DEF(2601, EN_RPC_SOCKET_ERROR, "inject rpc socket error, making rpc failed");
GLOBAL_ERRSIM_POINT_DEF(2602, EN_RPC_IO_THREAD_HANG, "make rpc io thread hang for a while");
GLOBAL_ERRSIM_POINT_DEF(2603, EN_RPC_DISABLE_RPC_SOCKET_KEEPALIVE, "operating system level disable rpc socket keepalive");
// HTable Parallel DDL begin 2621 - 2650
GLOBAL_ERRSIM_POINT_DEF(2621, EN_CREATE_HTABLE_TG_FINISH_ERR, "create htable tablegroup finish error");
......
......@@ -84,6 +84,13 @@ void keepalive_check(pktc_t* client_io, pkts_t* server_io) {
update_socket_keepalive_params(sk->fd, pnio_keepalive_timeout);
sk->user_keepalive_timeout = pnio_keepalive_timeout;
}
if (pnio_keepalive_timeout > 0
&& 0 != check_socket_write_ack(sk->fd, &sk->sk_diag_info, pnio_keepalive_timeout)) {
rk_warn("socket send data is hung and it will be closed, sock=%p, dest=%d:%d",
sk, sk->dest.ip, sk->dest.port);
sk->mask |= EPOLLERR;
eloop_fire(client_io->ep, (sock_t*)sk);
}
}
// walks through pkts_t skmap, refresh tcp keepalive params
if (pkts_is_init(server_io) && server_io->user_keepalive_timeout != pnio_keepalive_timeout) {
......@@ -95,6 +102,18 @@ void keepalive_check(pktc_t* client_io, pkts_t* server_io) {
}
server_io->user_keepalive_timeout = pnio_keepalive_timeout;
}
if (pnio_keepalive_timeout > 0 && pkts_is_init(server_io)) {
dlink_for(&server_io->sk_list, p) {
pkts_sk_t *sk = structof(p, pkts_sk_t, list_link);
if (0 != check_socket_write_ack(sk->fd, &sk->sk_diag_info, pnio_keepalive_timeout)) {
char peer_addr_buf[PNIO_NIO_ADDR_LEN] = {'\0'};
rk_warn("socket send data is hung and it will be closed, sock=%p, peer=%s",
sk, addr_str(sk->peer, peer_addr_buf, sizeof(peer_addr_buf)));
sk->mask |= EPOLLERR;
eloop_fire(server_io->ep, (sock_t*)sk);
}
}
}
}
}
......
......@@ -10,6 +10,7 @@
* See the Mulan PubL v2 for more details.
*/
#include <sys/ioctl.h>
#define PNIO_TCP_SYNCNT 3
int check_connect_result(int fd) {
int err = 0;
......@@ -128,6 +129,13 @@ int set_sock_opt(int fd, int option, int value)
}
void update_socket_keepalive_params(int fd, int64_t user_timeout) {
#ifdef ERRSIM
int tmp_ret = OB_E(EventTable::EN_RPC_DISABLE_RPC_SOCKET_KEEPALIVE) OB_SUCCESS;
if (OB_SUCCESS != tmp_ret) {
rk_warn("disable rpc socket keepalive for errsim, fd=%d, tmp_ret=%d", fd, tmp_ret);
return;
}
#endif
int tcp_keepalive = (user_timeout > 0) ? 1: 0;
int tcp_keepidle = user_timeout/5000000;
if (tcp_keepidle < 1) {
......@@ -146,13 +154,35 @@ void update_socket_keepalive_params(int fd, int64_t user_timeout) {
ignore_ret_value(set_tcpopt(fd, TCP_USER_TIMEOUT, tcp_user_timeout));
}
} else {
if (set_tcpopt(fd, SO_KEEPALIVE, 0)) {
if (set_sock_opt(fd, SO_KEEPALIVE, 0)) {
rk_warn("disable SO_KEEPALIVE error: %d, fd=%d\n", errno, fd);
} else {
ignore_ret_value(set_tcpopt(fd, TCP_USER_TIMEOUT, 0));
}
}
}
int check_socket_write_ack(int fd, socket_diag_info_t* sk_diag_info, const int64_t user_timeout) {
int err = 0;
int qlen = 0;
if (0 != ioctl(fd, TIOCOUTQ, &qlen)) {
// this case not to close socket, just to log error
rk_error("Failed to do TIOCOUTQ ioctl on fd=%d, errno(%d)", fd, errno);
} else {
int64_t cur_time = rk_get_us();
int64_t ack_size = sk_diag_info->write_size - qlen;
if (qlen <= 0 || ack_size != sk_diag_info->write_ack_size) {
sk_diag_info->write_ack_time = cur_time;
sk_diag_info->write_ack_size = ack_size;
} else if (0 != sk_diag_info->write_ack_time && cur_time - sk_diag_info->write_ack_time > user_timeout) {
err = -EIO;
rk_error("socket is unable to send data for more than %ld us, tcp socket is hung or cpu load is too high, "
"the socket will be closed, fd=%d, write_ack_time=%ld, write_ack_size=%ld, qlen=%d",
user_timeout, fd, sk_diag_info->write_ack_time, sk_diag_info->write_ack_size, qlen);
}
}
return err;
}
int set_tcp_recv_buf(int fd, int size) {
return setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (const char*)&size, sizeof(size));
......
......@@ -24,6 +24,7 @@ extern int set_tcp_linger_on(int fd);
extern int set_tcp_nodelay(int fd);
extern int set_tcpopt(int fd, int option, int value);
extern void update_socket_keepalive_params(int fd, int64_t user_timeout);
extern int check_socket_write_ack(int fd, socket_diag_info_t* sk_diag_info, const int64_t user_timeout);
extern int set_tcp_recv_buf(int fd, int size);
extern int set_tcp_send_buf(int fd, int size);
extern const char* sock_fd_str(int fd, char *buf, int buf_len);
......
......@@ -28,6 +28,8 @@ typedef struct socket_diag_info_t
int64_t last_read_time;
uint64_t write_cnt;
uint64_t write_size;
uint64_t write_ack_size; // the size of data that has been acknowledged by the remote side
int64_t write_ack_time; // the last check time when the write_ack_size is updated
uint64_t write_wait_time;
uint64_t read_cnt;
uint64_t read_size;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册