diff --git a/include/uapi/linux/errqueue.h b/include/uapi/linux/errqueue.h index dc64cfaf13da08564a8271e50a4edb89d221b148..28812eda420908ebf73008525d90c1abb3907536 100644 --- a/include/uapi/linux/errqueue.h +++ b/include/uapi/linux/errqueue.h @@ -20,11 +20,13 @@ struct sock_extended_err { #define SO_EE_ORIGIN_ICMP6 3 #define SO_EE_ORIGIN_TXSTATUS 4 #define SO_EE_ORIGIN_ZEROCOPY 5 +#define SO_EE_ORIGIN_ZCOOKIE 6 #define SO_EE_ORIGIN_TIMESTAMPING SO_EE_ORIGIN_TXSTATUS #define SO_EE_OFFENDER(ee) ((struct sockaddr*)((ee)+1)) #define SO_EE_CODE_ZEROCOPY_COPIED 1 +#define SO_EE_ORIGIN_MAX_ZCOOKIES 8 /** * struct scm_timestamping - timestamps exposed through cmsg diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c index 0a8eefd256b3e7af8e4cb58a4729924752a4391b..a937f18896aee6c0fce229b528d77b9ed33ae05c 100644 --- a/net/rds/af_rds.c +++ b/net/rds/af_rds.c @@ -182,6 +182,8 @@ static __poll_t rds_poll(struct file *file, struct socket *sock, mask |= (EPOLLIN | EPOLLRDNORM); if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) mask |= (EPOLLOUT | EPOLLWRNORM); + if (sk->sk_err || !skb_queue_empty(&sk->sk_error_queue)) + mask |= POLLERR; read_unlock_irqrestore(&rs->rs_recv_lock, flags); /* clear state any time we wake a seen-congested socket */ diff --git a/net/rds/message.c b/net/rds/message.c index ef3daafa3d79e9acb502d431fb732ce808c0c131..bf1a656b198a06eeae26db7c760d6173d87950a8 100644 --- a/net/rds/message.c +++ b/net/rds/message.c @@ -33,6 +33,9 @@ #include #include #include +#include +#include +#include #include "rds.h" @@ -53,29 +56,95 @@ void rds_message_addref(struct rds_message *rm) } EXPORT_SYMBOL_GPL(rds_message_addref); +static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie) +{ + struct sock_exterr_skb *serr = SKB_EXT_ERR(skb); + int ncookies; + u32 *ptr; + + if (serr->ee.ee_origin != SO_EE_ORIGIN_ZCOOKIE) + return false; + ncookies = serr->ee.ee_data; + if (ncookies == SO_EE_ORIGIN_MAX_ZCOOKIES) + return false; + ptr = skb_put(skb, sizeof(u32)); + *ptr = cookie; + serr->ee.ee_data = ++ncookies; + return true; +} + +static void rds_rm_zerocopy_callback(struct rds_sock *rs, + struct rds_znotifier *znotif) +{ + struct sock *sk = rds_rs_to_sk(rs); + struct sk_buff *skb, *tail; + struct sock_exterr_skb *serr; + unsigned long flags; + struct sk_buff_head *q; + u32 cookie = znotif->z_cookie; + + q = &sk->sk_error_queue; + spin_lock_irqsave(&q->lock, flags); + tail = skb_peek_tail(q); + + if (tail && skb_zcookie_add(tail, cookie)) { + spin_unlock_irqrestore(&q->lock, flags); + mm_unaccount_pinned_pages(&znotif->z_mmp); + consume_skb(rds_skb_from_znotifier(znotif)); + sk->sk_error_report(sk); + return; + } + + skb = rds_skb_from_znotifier(znotif); + serr = SKB_EXT_ERR(skb); + memset(&serr->ee, 0, sizeof(serr->ee)); + serr->ee.ee_errno = 0; + serr->ee.ee_origin = SO_EE_ORIGIN_ZCOOKIE; + serr->ee.ee_info = 0; + WARN_ON(!skb_zcookie_add(skb, cookie)); + + __skb_queue_tail(q, skb); + + spin_unlock_irqrestore(&q->lock, flags); + sk->sk_error_report(sk); + + mm_unaccount_pinned_pages(&znotif->z_mmp); +} + /* * This relies on dma_map_sg() not touching sg[].page during merging. */ static void rds_message_purge(struct rds_message *rm) { unsigned long i, flags; + bool zcopy = false; if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags))) return; - for (i = 0; i < rm->data.op_nents; i++) { - rdsdebug("putting data page %p\n", (void *)sg_page(&rm->data.op_sg[i])); - /* XXX will have to put_page for page refs */ - __free_page(sg_page(&rm->data.op_sg[i])); - } - rm->data.op_nents = 0; spin_lock_irqsave(&rm->m_rs_lock, flags); if (rm->m_rs) { - sock_put(rds_rs_to_sk(rm->m_rs)); + struct rds_sock *rs = rm->m_rs; + + if (rm->data.op_mmp_znotifier) { + zcopy = true; + rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier); + rm->data.op_mmp_znotifier = NULL; + } + sock_put(rds_rs_to_sk(rs)); rm->m_rs = NULL; } spin_unlock_irqrestore(&rm->m_rs_lock, flags); + for (i = 0; i < rm->data.op_nents; i++) { + /* XXX will have to put_page for page refs */ + if (!zcopy) + __free_page(sg_page(&rm->data.op_sg[i])); + else + put_page(sg_page(&rm->data.op_sg[i])); + } + rm->data.op_nents = 0; + if (rm->rdma.op_active) rds_rdma_free_op(&rm->rdma); if (rm->rdma.op_rdma_mr) diff --git a/net/rds/rds.h b/net/rds/rds.h index 7301b9b01890ed170114550c28e8b0433cf3da7b..24576bc4a5e93d70445e778e2e0fdf40af0df145 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -356,6 +356,19 @@ static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie) #define RDS_MSG_PAGEVEC 7 #define RDS_MSG_FLUSH 8 +struct rds_znotifier { + struct list_head z_list; + struct mmpin z_mmp; + u32 z_cookie; +}; + +#define RDS_ZCOPY_SKB(__skb) ((struct rds_znotifier *)&((__skb)->cb[0])) + +static inline struct sk_buff *rds_skb_from_znotifier(struct rds_znotifier *z) +{ + return container_of((void *)z, struct sk_buff, cb); +} + struct rds_message { refcount_t m_refcount; struct list_head m_sock_item; @@ -436,6 +449,7 @@ struct rds_message { unsigned int op_count; unsigned int op_dmasg; unsigned int op_dmaoff; + struct rds_znotifier *op_mmp_znotifier; struct scatterlist *op_sg; } data; }; diff --git a/net/rds/recv.c b/net/rds/recv.c index b25bcfe411ca65935ef9e2417a836d2575023cdd..b080961464df71ac048e971a544b32f3be5e7b48 100644 --- a/net/rds/recv.c +++ b/net/rds/recv.c @@ -594,6 +594,8 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size, if (msg_flags & MSG_OOB) goto out; + if (msg_flags & MSG_ERRQUEUE) + return sock_recv_errqueue(sk, msg, size, SOL_IP, IP_RECVERR); while (1) { /* If there are pending notifications, do those - and nothing else */