提交 7c13f97f 编写于 作者: P Paolo Abeni 提交者: David S. Miller

udp: do fwd memory scheduling on dequeue

A new argument is added to __skb_recv_datagram to provide
an explicit skb destructor, invoked under the receive queue
lock.
The UDP protocol uses such argument to perform memory
reclaiming on dequeue, so that the UDP protocol does not
set anymore skb->desctructor.
Instead explicit memory reclaiming is performed at close() time and
when skbs are removed from the receive queue.
The in kernel UDP protocol users now need to call a
skb_recv_udp() variant instead of skb_recv_datagram() to
properly perform memory accounting on dequeue.

Overall, this allows acquiring only once the receive queue
lock on dequeue.

Tested using pktgen with random src port, 64 bytes packet,
wire-speed on a 10G link as sender and udp_sink as the receiver,
using an l4 tuple rxhash to stress the contention, and one or more
udp_sink instances with reuseport.

nr sinks	vanilla		patched
1		440		560
3		2150		2300
6		3650		3800
9		4450		4600
12		6250		6450

v1 -> v2:
 - do rmem and allocated memory scheduling under the receive lock
 - do bulk scheduling in first_packet_length() and in udp_destruct_sock()
 - avoid the typdef for the dequeue callback
Suggested-by: NEric Dumazet <edumazet@google.com>
Acked-by: NHannes Frederic Sowa <hannes@stressinduktion.org>
Signed-off-by: NPaolo Abeni <pabeni@redhat.com>
Acked-by: NEric Dumazet <edumazet@google.com>
Signed-off-by: NDavid S. Miller <davem@davemloft.net>
上级 ad959036
...@@ -3033,9 +3033,13 @@ static inline void skb_frag_list_init(struct sk_buff *skb) ...@@ -3033,9 +3033,13 @@ static inline void skb_frag_list_init(struct sk_buff *skb)
int __skb_wait_for_more_packets(struct sock *sk, int *err, long *timeo_p, int __skb_wait_for_more_packets(struct sock *sk, int *err, long *timeo_p,
const struct sk_buff *skb); const struct sk_buff *skb);
struct sk_buff *__skb_try_recv_datagram(struct sock *sk, unsigned flags, struct sk_buff *__skb_try_recv_datagram(struct sock *sk, unsigned flags,
void (*destructor)(struct sock *sk,
struct sk_buff *skb),
int *peeked, int *off, int *err, int *peeked, int *off, int *err,
struct sk_buff **last); struct sk_buff **last);
struct sk_buff *__skb_recv_datagram(struct sock *sk, unsigned flags, struct sk_buff *__skb_recv_datagram(struct sock *sk, unsigned flags,
void (*destructor)(struct sock *sk,
struct sk_buff *skb),
int *peeked, int *off, int *err); int *peeked, int *off, int *err);
struct sk_buff *skb_recv_datagram(struct sock *sk, unsigned flags, int noblock, struct sk_buff *skb_recv_datagram(struct sock *sk, unsigned flags, int noblock,
int *err); int *err);
......
...@@ -248,6 +248,21 @@ static inline __be16 udp_flow_src_port(struct net *net, struct sk_buff *skb, ...@@ -248,6 +248,21 @@ static inline __be16 udp_flow_src_port(struct net *net, struct sk_buff *skb,
/* net/ipv4/udp.c */ /* net/ipv4/udp.c */
void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len); void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len);
int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb); int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb);
void udp_skb_destructor(struct sock *sk, struct sk_buff *skb);
static inline struct sk_buff *
__skb_recv_udp(struct sock *sk, unsigned int flags, int noblock, int *peeked,
int *off, int *err)
{
return __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0),
udp_skb_destructor, peeked, off, err);
}
static inline struct sk_buff *skb_recv_udp(struct sock *sk, unsigned int flags,
int noblock, int *err)
{
int peeked, off = 0;
return __skb_recv_udp(sk, flags, noblock, &peeked, &off, err);
}
void udp_v4_early_demux(struct sk_buff *skb); void udp_v4_early_demux(struct sk_buff *skb);
int udp_get_port(struct sock *sk, unsigned short snum, int udp_get_port(struct sock *sk, unsigned short snum,
......
...@@ -165,6 +165,7 @@ static struct sk_buff *skb_set_peeked(struct sk_buff *skb) ...@@ -165,6 +165,7 @@ static struct sk_buff *skb_set_peeked(struct sk_buff *skb)
* __skb_try_recv_datagram - Receive a datagram skbuff * __skb_try_recv_datagram - Receive a datagram skbuff
* @sk: socket * @sk: socket
* @flags: MSG_ flags * @flags: MSG_ flags
* @destructor: invoked under the receive lock on successful dequeue
* @peeked: returns non-zero if this packet has been seen before * @peeked: returns non-zero if this packet has been seen before
* @off: an offset in bytes to peek skb from. Returns an offset * @off: an offset in bytes to peek skb from. Returns an offset
* within an skb where data actually starts * within an skb where data actually starts
...@@ -197,6 +198,8 @@ static struct sk_buff *skb_set_peeked(struct sk_buff *skb) ...@@ -197,6 +198,8 @@ static struct sk_buff *skb_set_peeked(struct sk_buff *skb)
* the standard around please. * the standard around please.
*/ */
struct sk_buff *__skb_try_recv_datagram(struct sock *sk, unsigned int flags, struct sk_buff *__skb_try_recv_datagram(struct sock *sk, unsigned int flags,
void (*destructor)(struct sock *sk,
struct sk_buff *skb),
int *peeked, int *off, int *err, int *peeked, int *off, int *err,
struct sk_buff **last) struct sk_buff **last)
{ {
...@@ -241,9 +244,11 @@ struct sk_buff *__skb_try_recv_datagram(struct sock *sk, unsigned int flags, ...@@ -241,9 +244,11 @@ struct sk_buff *__skb_try_recv_datagram(struct sock *sk, unsigned int flags,
} }
atomic_inc(&skb->users); atomic_inc(&skb->users);
} else } else {
__skb_unlink(skb, queue); __skb_unlink(skb, queue);
if (destructor)
destructor(sk, skb);
}
spin_unlock_irqrestore(&queue->lock, cpu_flags); spin_unlock_irqrestore(&queue->lock, cpu_flags);
*off = _off; *off = _off;
return skb; return skb;
...@@ -262,6 +267,8 @@ struct sk_buff *__skb_try_recv_datagram(struct sock *sk, unsigned int flags, ...@@ -262,6 +267,8 @@ struct sk_buff *__skb_try_recv_datagram(struct sock *sk, unsigned int flags,
EXPORT_SYMBOL(__skb_try_recv_datagram); EXPORT_SYMBOL(__skb_try_recv_datagram);
struct sk_buff *__skb_recv_datagram(struct sock *sk, unsigned int flags, struct sk_buff *__skb_recv_datagram(struct sock *sk, unsigned int flags,
void (*destructor)(struct sock *sk,
struct sk_buff *skb),
int *peeked, int *off, int *err) int *peeked, int *off, int *err)
{ {
struct sk_buff *skb, *last; struct sk_buff *skb, *last;
...@@ -270,8 +277,8 @@ struct sk_buff *__skb_recv_datagram(struct sock *sk, unsigned int flags, ...@@ -270,8 +277,8 @@ struct sk_buff *__skb_recv_datagram(struct sock *sk, unsigned int flags,
timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
do { do {
skb = __skb_try_recv_datagram(sk, flags, peeked, off, err, skb = __skb_try_recv_datagram(sk, flags, destructor, peeked,
&last); off, err, &last);
if (skb) if (skb)
return skb; return skb;
...@@ -290,7 +297,7 @@ struct sk_buff *skb_recv_datagram(struct sock *sk, unsigned int flags, ...@@ -290,7 +297,7 @@ struct sk_buff *skb_recv_datagram(struct sock *sk, unsigned int flags,
int peeked, off = 0; int peeked, off = 0;
return __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0), return __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0),
&peeked, &off, err); NULL, &peeked, &off, err);
} }
EXPORT_SYMBOL(skb_recv_datagram); EXPORT_SYMBOL(skb_recv_datagram);
......
...@@ -1173,26 +1173,26 @@ int udp_sendpage(struct sock *sk, struct page *page, int offset, ...@@ -1173,26 +1173,26 @@ int udp_sendpage(struct sock *sk, struct page *page, int offset,
return ret; return ret;
} }
/* fully reclaim rmem/fwd memory allocated for skb */
static void udp_rmem_release(struct sock *sk, int size, int partial) static void udp_rmem_release(struct sock *sk, int size, int partial)
{ {
int amt; int amt;
atomic_sub(size, &sk->sk_rmem_alloc); atomic_sub(size, &sk->sk_rmem_alloc);
spin_lock_bh(&sk->sk_receive_queue.lock);
sk->sk_forward_alloc += size; sk->sk_forward_alloc += size;
amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1); amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1);
sk->sk_forward_alloc -= amt; sk->sk_forward_alloc -= amt;
spin_unlock_bh(&sk->sk_receive_queue.lock);
if (amt) if (amt)
__sk_mem_reduce_allocated(sk, amt >> SK_MEM_QUANTUM_SHIFT); __sk_mem_reduce_allocated(sk, amt >> SK_MEM_QUANTUM_SHIFT);
} }
static void udp_rmem_free(struct sk_buff *skb) /* Note: called with sk_receive_queue.lock held */
void udp_skb_destructor(struct sock *sk, struct sk_buff *skb)
{ {
udp_rmem_release(skb->sk, skb->truesize, 1); udp_rmem_release(sk, skb->truesize, 1);
} }
EXPORT_SYMBOL(udp_skb_destructor);
int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb) int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb)
{ {
...@@ -1229,9 +1229,9 @@ int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb) ...@@ -1229,9 +1229,9 @@ int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb)
sk->sk_forward_alloc -= size; sk->sk_forward_alloc -= size;
/* the skb owner in now the udp socket */ /* no need to setup a destructor, we will explicitly release the
skb->sk = sk; * forward allocated memory on dequeue
skb->destructor = udp_rmem_free; */
skb->dev = NULL; skb->dev = NULL;
sock_skb_set_dropcount(sk, skb); sock_skb_set_dropcount(sk, skb);
...@@ -1255,8 +1255,15 @@ EXPORT_SYMBOL_GPL(__udp_enqueue_schedule_skb); ...@@ -1255,8 +1255,15 @@ EXPORT_SYMBOL_GPL(__udp_enqueue_schedule_skb);
static void udp_destruct_sock(struct sock *sk) static void udp_destruct_sock(struct sock *sk)
{ {
/* reclaim completely the forward allocated memory */ /* reclaim completely the forward allocated memory */
__skb_queue_purge(&sk->sk_receive_queue); unsigned int total = 0;
udp_rmem_release(sk, 0, 0); struct sk_buff *skb;
while ((skb = __skb_dequeue(&sk->sk_receive_queue)) != NULL) {
total += skb->truesize;
kfree_skb(skb);
}
udp_rmem_release(sk, total, 0);
inet_sock_destruct(sk); inet_sock_destruct(sk);
} }
...@@ -1288,12 +1295,11 @@ EXPORT_SYMBOL_GPL(skb_consume_udp); ...@@ -1288,12 +1295,11 @@ EXPORT_SYMBOL_GPL(skb_consume_udp);
*/ */
static int first_packet_length(struct sock *sk) static int first_packet_length(struct sock *sk)
{ {
struct sk_buff_head list_kill, *rcvq = &sk->sk_receive_queue; struct sk_buff_head *rcvq = &sk->sk_receive_queue;
struct sk_buff *skb; struct sk_buff *skb;
int total = 0;
int res; int res;
__skb_queue_head_init(&list_kill);
spin_lock_bh(&rcvq->lock); spin_lock_bh(&rcvq->lock);
while ((skb = skb_peek(rcvq)) != NULL && while ((skb = skb_peek(rcvq)) != NULL &&
udp_lib_checksum_complete(skb)) { udp_lib_checksum_complete(skb)) {
...@@ -1303,12 +1309,13 @@ static int first_packet_length(struct sock *sk) ...@@ -1303,12 +1309,13 @@ static int first_packet_length(struct sock *sk)
IS_UDPLITE(sk)); IS_UDPLITE(sk));
atomic_inc(&sk->sk_drops); atomic_inc(&sk->sk_drops);
__skb_unlink(skb, rcvq); __skb_unlink(skb, rcvq);
__skb_queue_tail(&list_kill, skb); total += skb->truesize;
kfree_skb(skb);
} }
res = skb ? skb->len : -1; res = skb ? skb->len : -1;
if (total)
udp_rmem_release(sk, total, 1);
spin_unlock_bh(&rcvq->lock); spin_unlock_bh(&rcvq->lock);
__skb_queue_purge(&list_kill);
return res; return res;
} }
...@@ -1363,8 +1370,7 @@ int udp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int noblock, ...@@ -1363,8 +1370,7 @@ int udp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int noblock,
try_again: try_again:
peeking = off = sk_peek_offset(sk, flags); peeking = off = sk_peek_offset(sk, flags);
skb = __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0), skb = __skb_recv_udp(sk, flags, noblock, &peeked, &off, &err);
&peeked, &off, &err);
if (!skb) if (!skb)
return err; return err;
......
...@@ -343,8 +343,7 @@ int udpv6_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, ...@@ -343,8 +343,7 @@ int udpv6_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
try_again: try_again:
peeking = off = sk_peek_offset(sk, flags); peeking = off = sk_peek_offset(sk, flags);
skb = __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0), skb = __skb_recv_udp(sk, flags, noblock, &peeked, &off, &err);
&peeked, &off, &err);
if (!skb) if (!skb)
return err; return err;
......
...@@ -1053,7 +1053,7 @@ void rxrpc_data_ready(struct sock *udp_sk) ...@@ -1053,7 +1053,7 @@ void rxrpc_data_ready(struct sock *udp_sk)
ASSERT(!irqs_disabled()); ASSERT(!irqs_disabled());
skb = skb_recv_datagram(udp_sk, 0, 1, &ret); skb = skb_recv_udp(udp_sk, 0, 1, &ret);
if (!skb) { if (!skb) {
if (ret == -EAGAIN) if (ret == -EAGAIN)
return; return;
...@@ -1075,10 +1075,9 @@ void rxrpc_data_ready(struct sock *udp_sk) ...@@ -1075,10 +1075,9 @@ void rxrpc_data_ready(struct sock *udp_sk)
__UDP_INC_STATS(&init_net, UDP_MIB_INDATAGRAMS, 0); __UDP_INC_STATS(&init_net, UDP_MIB_INDATAGRAMS, 0);
/* The socket buffer we have is owned by UDP, with UDP's data all over /* The UDP protocol already released all skb resources;
* it, but we really want our own data there. * we are free to add our own data there.
*/ */
skb_orphan(skb);
sp = rxrpc_skb(skb); sp = rxrpc_skb(skb);
/* dig out the RxRPC connection details */ /* dig out the RxRPC connection details */
......
...@@ -547,7 +547,7 @@ static int svc_udp_recvfrom(struct svc_rqst *rqstp) ...@@ -547,7 +547,7 @@ static int svc_udp_recvfrom(struct svc_rqst *rqstp)
err = kernel_recvmsg(svsk->sk_sock, &msg, NULL, err = kernel_recvmsg(svsk->sk_sock, &msg, NULL,
0, 0, MSG_PEEK | MSG_DONTWAIT); 0, 0, MSG_PEEK | MSG_DONTWAIT);
if (err >= 0) if (err >= 0)
skb = skb_recv_datagram(svsk->sk_sk, 0, 1, &err); skb = skb_recv_udp(svsk->sk_sk, 0, 1, &err);
if (skb == NULL) { if (skb == NULL) {
if (err != -EAGAIN) { if (err != -EAGAIN) {
......
...@@ -1080,7 +1080,7 @@ static void xs_udp_data_receive(struct sock_xprt *transport) ...@@ -1080,7 +1080,7 @@ static void xs_udp_data_receive(struct sock_xprt *transport)
if (sk == NULL) if (sk == NULL)
goto out; goto out;
for (;;) { for (;;) {
skb = skb_recv_datagram(sk, 0, 1, &err); skb = skb_recv_udp(sk, 0, 1, &err);
if (skb != NULL) { if (skb != NULL) {
xs_udp_data_read_skb(&transport->xprt, sk, skb); xs_udp_data_read_skb(&transport->xprt, sk, skb);
consume_skb(skb); consume_skb(skb);
......
...@@ -2113,8 +2113,8 @@ static int unix_dgram_recvmsg(struct socket *sock, struct msghdr *msg, ...@@ -2113,8 +2113,8 @@ static int unix_dgram_recvmsg(struct socket *sock, struct msghdr *msg,
mutex_lock(&u->iolock); mutex_lock(&u->iolock);
skip = sk_peek_offset(sk, flags); skip = sk_peek_offset(sk, flags);
skb = __skb_try_recv_datagram(sk, flags, &peeked, &skip, &err, skb = __skb_try_recv_datagram(sk, flags, NULL, &peeked, &skip,
&last); &err, &last);
if (skb) if (skb)
break; break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册