提交 9e550f01 编写于 作者: D David S. Miller

Merge branch 'rxrpc-fixes'

David Howells says:

====================
rxrpc: Fixes

Here is a collection of fixes for rxrpc:

 (1) rxrpc_error_report() needs to call sock_error() to clear the error
     code from the UDP transport socket, lest it be unexpectedly revisited
     on the next kernel_sendmsg() call.  This has been causing all sorts of
     weird effects in AFS as the effects have typically been felt by the
     wrong RxRPC call.

 (2) Allow a kernel user of AF_RXRPC to easily detect if an rxrpc call has
     completed.

 (3) Allow errors incurred by attempting to transmit data through the UDP
     socket to get back up the stack to AFS.

 (4) Make AFS use (2) to abort the synchronous-mode call waiting loop if
     the rxrpc-level call completed.

 (5) Add a missing tracepoint case for tracing abort reception.

 (6) Fix detection and handling of out-of-order ACKs.

====================
Tested-by: NJonathan Billings <jsbillin@umich.edu>
Signed-off-by: NDavid S. Miller <davem@davemloft.net>
...@@ -1009,16 +1009,18 @@ The kernel interface functions are as follows: ...@@ -1009,16 +1009,18 @@ The kernel interface functions are as follows:
(*) Check call still alive. (*) Check call still alive.
u32 rxrpc_kernel_check_life(struct socket *sock, bool rxrpc_kernel_check_life(struct socket *sock,
struct rxrpc_call *call); struct rxrpc_call *call,
u32 *_life);
void rxrpc_kernel_probe_life(struct socket *sock, void rxrpc_kernel_probe_life(struct socket *sock,
struct rxrpc_call *call); struct rxrpc_call *call);
The first function returns a number that is updated when ACKs are received The first function passes back in *_life a number that is updated when
from the peer (notably including PING RESPONSE ACKs which we can elicit by ACKs are received from the peer (notably including PING RESPONSE ACKs
sending PING ACKs to see if the call still exists on the server). The which we can elicit by sending PING ACKs to see if the call still exists
caller should compare the numbers of two calls to see if the call is still on the server). The caller should compare the numbers of two calls to see
alive after waiting for a suitable interval. if the call is still alive after waiting for a suitable interval. It also
returns true as long as the call hasn't yet reached the completed state.
This allows the caller to work out if the server is still contactable and This allows the caller to work out if the server is still contactable and
if the call is still alive on the server while waiting for the server to if the call is still alive on the server while waiting for the server to
......
...@@ -610,6 +610,7 @@ static long afs_wait_for_call_to_complete(struct afs_call *call, ...@@ -610,6 +610,7 @@ static long afs_wait_for_call_to_complete(struct afs_call *call,
bool stalled = false; bool stalled = false;
u64 rtt; u64 rtt;
u32 life, last_life; u32 life, last_life;
bool rxrpc_complete = false;
DECLARE_WAITQUEUE(myself, current); DECLARE_WAITQUEUE(myself, current);
...@@ -621,7 +622,7 @@ static long afs_wait_for_call_to_complete(struct afs_call *call, ...@@ -621,7 +622,7 @@ static long afs_wait_for_call_to_complete(struct afs_call *call,
rtt2 = 2; rtt2 = 2;
timeout = rtt2; timeout = rtt2;
last_life = rxrpc_kernel_check_life(call->net->socket, call->rxcall); rxrpc_kernel_check_life(call->net->socket, call->rxcall, &last_life);
add_wait_queue(&call->waitq, &myself); add_wait_queue(&call->waitq, &myself);
for (;;) { for (;;) {
...@@ -639,7 +640,12 @@ static long afs_wait_for_call_to_complete(struct afs_call *call, ...@@ -639,7 +640,12 @@ static long afs_wait_for_call_to_complete(struct afs_call *call,
if (afs_check_call_state(call, AFS_CALL_COMPLETE)) if (afs_check_call_state(call, AFS_CALL_COMPLETE))
break; break;
life = rxrpc_kernel_check_life(call->net->socket, call->rxcall); if (!rxrpc_kernel_check_life(call->net->socket, call->rxcall, &life)) {
/* rxrpc terminated the call. */
rxrpc_complete = true;
break;
}
if (timeout == 0 && if (timeout == 0 &&
life == last_life && signal_pending(current)) { life == last_life && signal_pending(current)) {
if (stalled) if (stalled)
...@@ -663,12 +669,16 @@ static long afs_wait_for_call_to_complete(struct afs_call *call, ...@@ -663,12 +669,16 @@ static long afs_wait_for_call_to_complete(struct afs_call *call,
remove_wait_queue(&call->waitq, &myself); remove_wait_queue(&call->waitq, &myself);
__set_current_state(TASK_RUNNING); __set_current_state(TASK_RUNNING);
/* Kill off the call if it's still live. */
if (!afs_check_call_state(call, AFS_CALL_COMPLETE)) { if (!afs_check_call_state(call, AFS_CALL_COMPLETE)) {
_debug("call interrupted"); if (rxrpc_complete) {
if (rxrpc_kernel_abort_call(call->net->socket, call->rxcall, afs_set_call_complete(call, call->error, call->abort_code);
RX_USER_ABORT, -EINTR, "KWI")) } else {
afs_set_call_complete(call, -EINTR, 0); /* Kill off the call if it's still live. */
_debug("call interrupted");
if (rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
RX_USER_ABORT, -EINTR, "KWI"))
afs_set_call_complete(call, -EINTR, 0);
}
} }
spin_lock_bh(&call->state_lock); spin_lock_bh(&call->state_lock);
......
...@@ -61,10 +61,12 @@ int rxrpc_kernel_charge_accept(struct socket *, rxrpc_notify_rx_t, ...@@ -61,10 +61,12 @@ int rxrpc_kernel_charge_accept(struct socket *, rxrpc_notify_rx_t,
rxrpc_user_attach_call_t, unsigned long, gfp_t, rxrpc_user_attach_call_t, unsigned long, gfp_t,
unsigned int); unsigned int);
void rxrpc_kernel_set_tx_length(struct socket *, struct rxrpc_call *, s64); void rxrpc_kernel_set_tx_length(struct socket *, struct rxrpc_call *, s64);
u32 rxrpc_kernel_check_life(const struct socket *, const struct rxrpc_call *); bool rxrpc_kernel_check_life(const struct socket *, const struct rxrpc_call *,
u32 *);
void rxrpc_kernel_probe_life(struct socket *, struct rxrpc_call *); void rxrpc_kernel_probe_life(struct socket *, struct rxrpc_call *);
u32 rxrpc_kernel_get_epoch(struct socket *, struct rxrpc_call *); u32 rxrpc_kernel_get_epoch(struct socket *, struct rxrpc_call *);
bool rxrpc_kernel_get_reply_time(struct socket *, struct rxrpc_call *, bool rxrpc_kernel_get_reply_time(struct socket *, struct rxrpc_call *,
ktime_t *); ktime_t *);
bool rxrpc_kernel_call_is_complete(struct rxrpc_call *);
#endif /* _NET_RXRPC_H */ #endif /* _NET_RXRPC_H */
...@@ -371,18 +371,22 @@ EXPORT_SYMBOL(rxrpc_kernel_end_call); ...@@ -371,18 +371,22 @@ EXPORT_SYMBOL(rxrpc_kernel_end_call);
* rxrpc_kernel_check_life - Check to see whether a call is still alive * rxrpc_kernel_check_life - Check to see whether a call is still alive
* @sock: The socket the call is on * @sock: The socket the call is on
* @call: The call to check * @call: The call to check
* @_life: Where to store the life value
* *
* Allow a kernel service to find out whether a call is still alive - ie. we're * Allow a kernel service to find out whether a call is still alive - ie. we're
* getting ACKs from the server. Returns a number representing the life state * getting ACKs from the server. Passes back in *_life a number representing
* which can be compared to that returned by a previous call. * the life state which can be compared to that returned by a previous call and
* return true if the call is still alive.
* *
* If the life state stalls, rxrpc_kernel_probe_life() should be called and * If the life state stalls, rxrpc_kernel_probe_life() should be called and
* then 2RTT waited. * then 2RTT waited.
*/ */
u32 rxrpc_kernel_check_life(const struct socket *sock, bool rxrpc_kernel_check_life(const struct socket *sock,
const struct rxrpc_call *call) const struct rxrpc_call *call,
u32 *_life)
{ {
return call->acks_latest; *_life = call->acks_latest;
return call->state != RXRPC_CALL_COMPLETE;
} }
EXPORT_SYMBOL(rxrpc_kernel_check_life); EXPORT_SYMBOL(rxrpc_kernel_check_life);
......
...@@ -654,6 +654,7 @@ struct rxrpc_call { ...@@ -654,6 +654,7 @@ struct rxrpc_call {
u8 ackr_reason; /* reason to ACK */ u8 ackr_reason; /* reason to ACK */
u16 ackr_skew; /* skew on packet being ACK'd */ u16 ackr_skew; /* skew on packet being ACK'd */
rxrpc_serial_t ackr_serial; /* serial of packet being ACK'd */ rxrpc_serial_t ackr_serial; /* serial of packet being ACK'd */
rxrpc_serial_t ackr_first_seq; /* first sequence number received */
rxrpc_seq_t ackr_prev_seq; /* previous sequence number received */ rxrpc_seq_t ackr_prev_seq; /* previous sequence number received */
rxrpc_seq_t ackr_consumed; /* Highest packet shown consumed */ rxrpc_seq_t ackr_consumed; /* Highest packet shown consumed */
rxrpc_seq_t ackr_seen; /* Highest packet shown seen */ rxrpc_seq_t ackr_seen; /* Highest packet shown seen */
......
...@@ -153,7 +153,8 @@ static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn, ...@@ -153,7 +153,8 @@ static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
* pass a connection-level abort onto all calls on that connection * pass a connection-level abort onto all calls on that connection
*/ */
static void rxrpc_abort_calls(struct rxrpc_connection *conn, static void rxrpc_abort_calls(struct rxrpc_connection *conn,
enum rxrpc_call_completion compl) enum rxrpc_call_completion compl,
rxrpc_serial_t serial)
{ {
struct rxrpc_call *call; struct rxrpc_call *call;
int i; int i;
...@@ -173,6 +174,9 @@ static void rxrpc_abort_calls(struct rxrpc_connection *conn, ...@@ -173,6 +174,9 @@ static void rxrpc_abort_calls(struct rxrpc_connection *conn,
call->call_id, 0, call->call_id, 0,
conn->abort_code, conn->abort_code,
conn->error); conn->error);
else
trace_rxrpc_rx_abort(call, serial,
conn->abort_code);
if (rxrpc_set_call_completion(call, compl, if (rxrpc_set_call_completion(call, compl,
conn->abort_code, conn->abort_code,
conn->error)) conn->error))
...@@ -213,8 +217,6 @@ static int rxrpc_abort_connection(struct rxrpc_connection *conn, ...@@ -213,8 +217,6 @@ static int rxrpc_abort_connection(struct rxrpc_connection *conn,
conn->state = RXRPC_CONN_LOCALLY_ABORTED; conn->state = RXRPC_CONN_LOCALLY_ABORTED;
spin_unlock_bh(&conn->state_lock); spin_unlock_bh(&conn->state_lock);
rxrpc_abort_calls(conn, RXRPC_CALL_LOCALLY_ABORTED);
msg.msg_name = &conn->params.peer->srx.transport; msg.msg_name = &conn->params.peer->srx.transport;
msg.msg_namelen = conn->params.peer->srx.transport_len; msg.msg_namelen = conn->params.peer->srx.transport_len;
msg.msg_control = NULL; msg.msg_control = NULL;
...@@ -242,6 +244,7 @@ static int rxrpc_abort_connection(struct rxrpc_connection *conn, ...@@ -242,6 +244,7 @@ static int rxrpc_abort_connection(struct rxrpc_connection *conn,
len = iov[0].iov_len + iov[1].iov_len; len = iov[0].iov_len + iov[1].iov_len;
serial = atomic_inc_return(&conn->serial); serial = atomic_inc_return(&conn->serial);
rxrpc_abort_calls(conn, RXRPC_CALL_LOCALLY_ABORTED, serial);
whdr.serial = htonl(serial); whdr.serial = htonl(serial);
_proto("Tx CONN ABORT %%%u { %d }", serial, conn->abort_code); _proto("Tx CONN ABORT %%%u { %d }", serial, conn->abort_code);
...@@ -321,7 +324,7 @@ static int rxrpc_process_event(struct rxrpc_connection *conn, ...@@ -321,7 +324,7 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
conn->error = -ECONNABORTED; conn->error = -ECONNABORTED;
conn->abort_code = abort_code; conn->abort_code = abort_code;
conn->state = RXRPC_CONN_REMOTELY_ABORTED; conn->state = RXRPC_CONN_REMOTELY_ABORTED;
rxrpc_abort_calls(conn, RXRPC_CALL_REMOTELY_ABORTED); rxrpc_abort_calls(conn, RXRPC_CALL_REMOTELY_ABORTED, sp->hdr.serial);
return -ECONNABORTED; return -ECONNABORTED;
case RXRPC_PACKET_TYPE_CHALLENGE: case RXRPC_PACKET_TYPE_CHALLENGE:
......
...@@ -837,7 +837,7 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb, ...@@ -837,7 +837,7 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
u8 acks[RXRPC_MAXACKS]; u8 acks[RXRPC_MAXACKS];
} buf; } buf;
rxrpc_serial_t acked_serial; rxrpc_serial_t acked_serial;
rxrpc_seq_t first_soft_ack, hard_ack; rxrpc_seq_t first_soft_ack, hard_ack, prev_pkt;
int nr_acks, offset, ioffset; int nr_acks, offset, ioffset;
_enter(""); _enter("");
...@@ -851,13 +851,14 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb, ...@@ -851,13 +851,14 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
acked_serial = ntohl(buf.ack.serial); acked_serial = ntohl(buf.ack.serial);
first_soft_ack = ntohl(buf.ack.firstPacket); first_soft_ack = ntohl(buf.ack.firstPacket);
prev_pkt = ntohl(buf.ack.previousPacket);
hard_ack = first_soft_ack - 1; hard_ack = first_soft_ack - 1;
nr_acks = buf.ack.nAcks; nr_acks = buf.ack.nAcks;
summary.ack_reason = (buf.ack.reason < RXRPC_ACK__INVALID ? summary.ack_reason = (buf.ack.reason < RXRPC_ACK__INVALID ?
buf.ack.reason : RXRPC_ACK__INVALID); buf.ack.reason : RXRPC_ACK__INVALID);
trace_rxrpc_rx_ack(call, sp->hdr.serial, acked_serial, trace_rxrpc_rx_ack(call, sp->hdr.serial, acked_serial,
first_soft_ack, ntohl(buf.ack.previousPacket), first_soft_ack, prev_pkt,
summary.ack_reason, nr_acks); summary.ack_reason, nr_acks);
if (buf.ack.reason == RXRPC_ACK_PING_RESPONSE) if (buf.ack.reason == RXRPC_ACK_PING_RESPONSE)
...@@ -878,8 +879,9 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb, ...@@ -878,8 +879,9 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
rxrpc_propose_ack_respond_to_ack); rxrpc_propose_ack_respond_to_ack);
} }
/* Discard any out-of-order or duplicate ACKs. */ /* Discard any out-of-order or duplicate ACKs (outside lock). */
if (before_eq(sp->hdr.serial, call->acks_latest)) if (before(first_soft_ack, call->ackr_first_seq) ||
before(prev_pkt, call->ackr_prev_seq))
return; return;
buf.info.rxMTU = 0; buf.info.rxMTU = 0;
...@@ -890,12 +892,16 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb, ...@@ -890,12 +892,16 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
spin_lock(&call->input_lock); spin_lock(&call->input_lock);
/* Discard any out-of-order or duplicate ACKs. */ /* Discard any out-of-order or duplicate ACKs (inside lock). */
if (before_eq(sp->hdr.serial, call->acks_latest)) if (before(first_soft_ack, call->ackr_first_seq) ||
before(prev_pkt, call->ackr_prev_seq))
goto out; goto out;
call->acks_latest_ts = skb->tstamp; call->acks_latest_ts = skb->tstamp;
call->acks_latest = sp->hdr.serial; call->acks_latest = sp->hdr.serial;
call->ackr_first_seq = first_soft_ack;
call->ackr_prev_seq = prev_pkt;
/* Parse rwind and mtu sizes if provided. */ /* Parse rwind and mtu sizes if provided. */
if (buf.info.rxMTU) if (buf.info.rxMTU)
rxrpc_input_ackinfo(call, skb, &buf.info); rxrpc_input_ackinfo(call, skb, &buf.info);
......
...@@ -157,6 +157,11 @@ void rxrpc_error_report(struct sock *sk) ...@@ -157,6 +157,11 @@ void rxrpc_error_report(struct sock *sk)
_enter("%p{%d}", sk, local->debug_id); _enter("%p{%d}", sk, local->debug_id);
/* Clear the outstanding error value on the socket so that it doesn't
* cause kernel_sendmsg() to return it later.
*/
sock_error(sk);
skb = sock_dequeue_err_skb(sk); skb = sock_dequeue_err_skb(sk);
if (!skb) { if (!skb) {
_leave("UDP socket errqueue empty"); _leave("UDP socket errqueue empty");
......
...@@ -152,12 +152,13 @@ static void rxrpc_notify_end_tx(struct rxrpc_sock *rx, struct rxrpc_call *call, ...@@ -152,12 +152,13 @@ static void rxrpc_notify_end_tx(struct rxrpc_sock *rx, struct rxrpc_call *call,
} }
/* /*
* Queue a DATA packet for transmission, set the resend timeout and send the * Queue a DATA packet for transmission, set the resend timeout and send
* packet immediately * the packet immediately. Returns the error from rxrpc_send_data_packet()
* in case the caller wants to do something with it.
*/ */
static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call, static int rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
struct sk_buff *skb, bool last, struct sk_buff *skb, bool last,
rxrpc_notify_end_tx_t notify_end_tx) rxrpc_notify_end_tx_t notify_end_tx)
{ {
struct rxrpc_skb_priv *sp = rxrpc_skb(skb); struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
unsigned long now; unsigned long now;
...@@ -250,7 +251,8 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call, ...@@ -250,7 +251,8 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
out: out:
rxrpc_free_skb(skb, rxrpc_skb_tx_freed); rxrpc_free_skb(skb, rxrpc_skb_tx_freed);
_leave(""); _leave(" = %d", ret);
return ret;
} }
/* /*
...@@ -423,9 +425,10 @@ static int rxrpc_send_data(struct rxrpc_sock *rx, ...@@ -423,9 +425,10 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
if (ret < 0) if (ret < 0)
goto out; goto out;
rxrpc_queue_packet(rx, call, skb, ret = rxrpc_queue_packet(rx, call, skb,
!msg_data_left(msg) && !more, !msg_data_left(msg) && !more,
notify_end_tx); notify_end_tx);
/* Should check for failure here */
skb = NULL; skb = NULL;
} }
} while (msg_data_left(msg) > 0); } while (msg_data_left(msg) > 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册