diff --git a/include/uapi/linux/rds.h b/include/uapi/linux/rds.h index 0f9265cb2a96999f92d415a5cca12af237451433..3833113ab2c049080bf24008c2c7b50bc393de81 100644 --- a/include/uapi/linux/rds.h +++ b/include/uapi/linux/rds.h @@ -52,6 +52,13 @@ #define RDS_GET_MR_FOR_DEST 7 #define SO_RDS_TRANSPORT 8 +/* Socket option to tap receive path latency + * SO_RDS: SO_RDS_MSG_RXPATH_LATENCY + * Format used struct rds_rx_trace_so + */ +#define SO_RDS_MSG_RXPATH_LATENCY 10 + + /* supported values for SO_RDS_TRANSPORT */ #define RDS_TRANS_IB 0 #define RDS_TRANS_IWARP 1 @@ -77,6 +84,12 @@ * the same as for the GET_MR setsockopt. * RDS_CMSG_RDMA_STATUS (recvmsg) * Returns the status of a completed RDMA operation. + * RDS_CMSG_RXPATH_LATENCY(recvmsg) + * Returns rds message latencies in various stages of receive + * path in nS. Its set per socket using SO_RDS_MSG_RXPATH_LATENCY + * socket option. Legitimate points are defined in + * enum rds_message_rxpath_latency. More points can be added in + * future. CSMG format is struct rds_cmsg_rx_trace. */ #define RDS_CMSG_RDMA_ARGS 1 #define RDS_CMSG_RDMA_DEST 2 @@ -87,6 +100,7 @@ #define RDS_CMSG_ATOMIC_CSWP 7 #define RDS_CMSG_MASKED_ATOMIC_FADD 8 #define RDS_CMSG_MASKED_ATOMIC_CSWP 9 +#define RDS_CMSG_RXPATH_LATENCY 11 #define RDS_INFO_FIRST 10000 #define RDS_INFO_COUNTERS 10000 @@ -171,6 +185,25 @@ struct rds_info_rdma_connection { uint32_t rdma_mr_size; }; +/* RDS message Receive Path Latency points */ +enum rds_message_rxpath_latency { + RDS_MSG_RX_HDR_TO_DGRAM_START = 0, + RDS_MSG_RX_DGRAM_REASSEMBLE, + RDS_MSG_RX_DGRAM_DELIVERED, + RDS_MSG_RX_DGRAM_TRACE_MAX +}; + +struct rds_rx_trace_so { + u8 rx_traces; + u8 rx_trace_pos[RDS_MSG_RX_DGRAM_TRACE_MAX]; +}; + +struct rds_cmsg_rx_trace { + u8 rx_traces; + u8 rx_trace_pos[RDS_MSG_RX_DGRAM_TRACE_MAX]; + u64 rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX]; +}; + /* * Congestion monitoring. * Congestion control in RDS happens at the host connection diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c index 2ac1e6194be35fced1706ebf5351bedb7cf01411..fd8217404162ecd8136554e3de567d98bbbf80b2 100644 --- a/net/rds/af_rds.c +++ b/net/rds/af_rds.c @@ -298,6 +298,30 @@ static int rds_enable_recvtstamp(struct sock *sk, char __user *optval, return 0; } +static int rds_recv_track_latency(struct rds_sock *rs, char __user *optval, + int optlen) +{ + struct rds_rx_trace_so trace; + int i; + + if (optlen != sizeof(struct rds_rx_trace_so)) + return -EFAULT; + + if (copy_from_user(&trace, optval, sizeof(trace))) + return -EFAULT; + + rs->rs_rx_traces = trace.rx_traces; + for (i = 0; i < rs->rs_rx_traces; i++) { + if (trace.rx_trace_pos[i] > RDS_MSG_RX_DGRAM_TRACE_MAX) { + rs->rs_rx_traces = 0; + return -EFAULT; + } + rs->rs_rx_trace[i] = trace.rx_trace_pos[i]; + } + + return 0; +} + static int rds_setsockopt(struct socket *sock, int level, int optname, char __user *optval, unsigned int optlen) { @@ -338,6 +362,9 @@ static int rds_setsockopt(struct socket *sock, int level, int optname, ret = rds_enable_recvtstamp(sock->sk, optval, optlen); release_sock(sock->sk); break; + case SO_RDS_MSG_RXPATH_LATENCY: + ret = rds_recv_track_latency(rs, optval, optlen); + break; default: ret = -ENOPROTOOPT; } @@ -484,6 +511,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol) INIT_LIST_HEAD(&rs->rs_cong_list); spin_lock_init(&rs->rs_rdma_lock); rs->rs_rdma_keys = RB_ROOT; + rs->rs_rx_traces = 0; spin_lock_bh(&rds_sock_lock); list_add_tail(&rs->rs_item, &rds_sock_list); diff --git a/net/rds/bind.c b/net/rds/bind.c index 095f6ce583fee33eee431dc88e04a26c01db2d4d..3a915bedb76c52995fdf5c09fa747f3a236a169b 100644 --- a/net/rds/bind.c +++ b/net/rds/bind.c @@ -176,8 +176,8 @@ int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len) if (!trans) { ret = -EADDRNOTAVAIL; rds_remove_bound(rs); - printk_ratelimited(KERN_INFO "RDS: rds_bind() could not find a transport, " - "load rds_tcp or rds_rdma?\n"); + pr_info_ratelimited("RDS: %s could not find a transport for %pI4, load rds_tcp or rds_rdma?\n", + __func__, &sin->sin_addr.s_addr); goto out; } diff --git a/net/rds/connection.c b/net/rds/connection.c index fe9d31c0b22d40ef8b45fd0f17cccbcfd893de16..0e04dcceb1d416438be8bb40fc68253f336f631d 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -545,11 +545,11 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len, } EXPORT_SYMBOL_GPL(rds_for_each_conn_info); -void rds_walk_conn_path_info(struct socket *sock, unsigned int len, - struct rds_info_iterator *iter, - struct rds_info_lengths *lens, - int (*visitor)(struct rds_conn_path *, void *), - size_t item_len) +static void rds_walk_conn_path_info(struct socket *sock, unsigned int len, + struct rds_info_iterator *iter, + struct rds_info_lengths *lens, + int (*visitor)(struct rds_conn_path *, void *), + size_t item_len) { u64 buffer[(item_len + 7) / 8]; struct hlist_head *head; diff --git a/net/rds/ib.c b/net/rds/ib.c index 5680d90b0b779ec41d019f1d0797dca7b5072ece..8d70884d7bb60294c1402892bff3ebe4c81d3663 100644 --- a/net/rds/ib.c +++ b/net/rds/ib.c @@ -111,6 +111,9 @@ static void rds_ib_dev_free(struct work_struct *work) kfree(i_ipaddr); } + if (rds_ibdev->vector_load) + kfree(rds_ibdev->vector_load); + kfree(rds_ibdev); } @@ -159,6 +162,14 @@ static void rds_ib_add_one(struct ib_device *device) rds_ibdev->max_initiator_depth = device->attrs.max_qp_init_rd_atom; rds_ibdev->max_responder_resources = device->attrs.max_qp_rd_atom; + rds_ibdev->vector_load = kzalloc(sizeof(int) * device->num_comp_vectors, + GFP_KERNEL); + if (!rds_ibdev->vector_load) { + pr_err("RDS/IB: %s failed to allocate vector memory\n", + __func__); + goto put_dev; + } + rds_ibdev->dev = device; rds_ibdev->pd = ib_alloc_pd(device, 0); if (IS_ERR(rds_ibdev->pd)) { diff --git a/net/rds/ib.h b/net/rds/ib.h index 45ac8e8e58f412267e134bb5ae1fab2b59809c09..540458928f3c8eab4529b4072826b882563c941e 100644 --- a/net/rds/ib.h +++ b/net/rds/ib.h @@ -14,9 +14,10 @@ #define RDS_IB_DEFAULT_RECV_WR 1024 #define RDS_IB_DEFAULT_SEND_WR 256 -#define RDS_IB_DEFAULT_FR_WR 512 +#define RDS_IB_DEFAULT_FR_WR 256 +#define RDS_IB_DEFAULT_FR_INV_WR 256 -#define RDS_IB_DEFAULT_RETRY_COUNT 2 +#define RDS_IB_DEFAULT_RETRY_COUNT 1 #define RDS_IB_SUPPORTED_PROTOCOLS 0x00000003 /* minor versions supported */ @@ -125,6 +126,7 @@ struct rds_ib_connection { /* To control the number of wrs from fastreg */ atomic_t i_fastreg_wrs; + atomic_t i_fastunreg_wrs; /* interrupt handling */ struct tasklet_struct i_send_tasklet; @@ -149,6 +151,7 @@ struct rds_ib_connection { u64 i_ack_recv; /* last ACK received */ struct rds_ib_refill_cache i_cache_incs; struct rds_ib_refill_cache i_cache_frags; + atomic_t i_cache_allocs; /* sending acks */ unsigned long i_ack_flags; @@ -179,6 +182,14 @@ struct rds_ib_connection { /* Batched completions */ unsigned int i_unsignaled_wrs; + + /* Endpoint role in connection */ + bool i_active_side; + atomic_t i_cq_quiesce; + + /* Send/Recv vectors */ + int i_scq_vector; + int i_rcq_vector; }; /* This assumes that atomic_t is at least 32 bits */ @@ -221,6 +232,7 @@ struct rds_ib_device { spinlock_t spinlock; /* protect the above */ atomic_t refcount; struct work_struct free_work; + int *vector_load; }; #define ibdev_to_node(ibdev) dev_to_node(ibdev->dma_device) @@ -249,6 +261,8 @@ struct rds_ib_statistics { uint64_t s_ib_rx_refill_from_cq; uint64_t s_ib_rx_refill_from_thread; uint64_t s_ib_rx_alloc_limit; + uint64_t s_ib_rx_total_frags; + uint64_t s_ib_rx_total_incs; uint64_t s_ib_rx_credit_updates; uint64_t s_ib_ack_sent; uint64_t s_ib_ack_send_failure; @@ -271,6 +285,8 @@ struct rds_ib_statistics { uint64_t s_ib_rdma_mr_1m_reused; uint64_t s_ib_atomic_cswp; uint64_t s_ib_atomic_fadd; + uint64_t s_ib_recv_added_to_cache; + uint64_t s_ib_recv_removed_from_cache; }; extern struct workqueue_struct *rds_ib_wq; @@ -401,6 +417,8 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op); /* ib_stats.c */ DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats); #define rds_ib_stats_inc(member) rds_stats_inc_which(rds_ib_stats, member) +#define rds_ib_stats_add(member, count) \ + rds_stats_add_which(rds_ib_stats, member, count) unsigned int rds_ib_stats_info_copy(struct rds_info_iterator *iter, unsigned int avail); diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index 5b2ab95afa072f4970e25bbc856e916fe28187aa..ce3775abc6e7a1d30e335aaea749a9840e949786 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -113,24 +113,26 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even } if (conn->c_version < RDS_PROTOCOL(3, 1)) { - printk(KERN_NOTICE "RDS/IB: Connection to %pI4 version %u.%u failed," - " no longer supported\n", - &conn->c_faddr, - RDS_PROTOCOL_MAJOR(conn->c_version), - RDS_PROTOCOL_MINOR(conn->c_version)); + pr_notice("RDS/IB: Connection <%pI4,%pI4> version %u.%u no longer supported\n", + &conn->c_laddr, &conn->c_faddr, + RDS_PROTOCOL_MAJOR(conn->c_version), + RDS_PROTOCOL_MINOR(conn->c_version)); rds_conn_destroy(conn); return; } else { - printk(KERN_NOTICE "RDS/IB: connected to %pI4 version %u.%u%s\n", - &conn->c_faddr, - RDS_PROTOCOL_MAJOR(conn->c_version), - RDS_PROTOCOL_MINOR(conn->c_version), - ic->i_flowctl ? ", flow control" : ""); + pr_notice("RDS/IB: %s conn connected <%pI4,%pI4> version %u.%u%s\n", + ic->i_active_side ? "Active" : "Passive", + &conn->c_laddr, &conn->c_faddr, + RDS_PROTOCOL_MAJOR(conn->c_version), + RDS_PROTOCOL_MINOR(conn->c_version), + ic->i_flowctl ? ", flow control" : ""); } - /* - * Init rings and fill recv. this needs to wait until protocol negotiation - * is complete, since ring layout is different from 3.0 to 3.1. + atomic_set(&ic->i_cq_quiesce, 0); + + /* Init rings and fill recv. this needs to wait until protocol + * negotiation is complete, since ring layout is different + * from 3.1 to 4.1. */ rds_ib_send_init_ring(ic); rds_ib_recv_init_ring(ic); @@ -267,6 +269,10 @@ static void rds_ib_tasklet_fn_send(unsigned long data) rds_ib_stats_inc(s_ib_tasklet_call); + /* if cq has been already reaped, ignore incoming cq event */ + if (atomic_read(&ic->i_cq_quiesce)) + return; + poll_scq(ic, ic->i_send_cq, ic->i_send_wc); ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP); poll_scq(ic, ic->i_send_cq, ic->i_send_wc); @@ -308,6 +314,10 @@ static void rds_ib_tasklet_fn_recv(unsigned long data) rds_ib_stats_inc(s_ib_tasklet_call); + /* if cq has been already reaped, ignore incoming cq event */ + if (atomic_read(&ic->i_cq_quiesce)) + return; + memset(&state, 0, sizeof(state)); poll_rcq(ic, ic->i_recv_cq, ic->i_recv_wc, &state); ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED); @@ -358,6 +368,28 @@ static void rds_ib_cq_comp_handler_send(struct ib_cq *cq, void *context) tasklet_schedule(&ic->i_send_tasklet); } +static inline int ibdev_get_unused_vector(struct rds_ib_device *rds_ibdev) +{ + int min = rds_ibdev->vector_load[rds_ibdev->dev->num_comp_vectors - 1]; + int index = rds_ibdev->dev->num_comp_vectors - 1; + int i; + + for (i = rds_ibdev->dev->num_comp_vectors - 1; i >= 0; i--) { + if (rds_ibdev->vector_load[i] < min) { + index = i; + min = rds_ibdev->vector_load[i]; + } + } + + rds_ibdev->vector_load[index]++; + return index; +} + +static inline void ibdev_put_vector(struct rds_ib_device *rds_ibdev, int index) +{ + rds_ibdev->vector_load[index]--; +} + /* * This needs to be very careful to not leave IS_ERR pointers around for * cleanup to trip over. @@ -383,7 +415,10 @@ static int rds_ib_setup_qp(struct rds_connection *conn) * completion queue and send queue. This extra space is used for FRMR * registration and invalidation work requests */ - fr_queue_space = (rds_ibdev->use_fastreg ? RDS_IB_DEFAULT_FR_WR : 0); + fr_queue_space = rds_ibdev->use_fastreg ? + (RDS_IB_DEFAULT_FR_WR + 1) + + (RDS_IB_DEFAULT_FR_INV_WR + 1) + : 0; /* add the conn now so that connection establishment has the dev */ rds_ib_add_conn(rds_ibdev, conn); @@ -396,25 +431,30 @@ static int rds_ib_setup_qp(struct rds_connection *conn) /* Protection domain and memory range */ ic->i_pd = rds_ibdev->pd; + ic->i_scq_vector = ibdev_get_unused_vector(rds_ibdev); cq_attr.cqe = ic->i_send_ring.w_nr + fr_queue_space + 1; - + cq_attr.comp_vector = ic->i_scq_vector; ic->i_send_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_send, rds_ib_cq_event_handler, conn, &cq_attr); if (IS_ERR(ic->i_send_cq)) { ret = PTR_ERR(ic->i_send_cq); ic->i_send_cq = NULL; + ibdev_put_vector(rds_ibdev, ic->i_scq_vector); rdsdebug("ib_create_cq send failed: %d\n", ret); goto out; } + ic->i_rcq_vector = ibdev_get_unused_vector(rds_ibdev); cq_attr.cqe = ic->i_recv_ring.w_nr; + cq_attr.comp_vector = ic->i_rcq_vector; ic->i_recv_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_recv, rds_ib_cq_event_handler, conn, &cq_attr); if (IS_ERR(ic->i_recv_cq)) { ret = PTR_ERR(ic->i_recv_cq); ic->i_recv_cq = NULL; + ibdev_put_vector(rds_ibdev, ic->i_rcq_vector); rdsdebug("ib_create_cq recv failed: %d\n", ret); goto out; } @@ -445,6 +485,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn) attr.send_cq = ic->i_send_cq; attr.recv_cq = ic->i_recv_cq; atomic_set(&ic->i_fastreg_wrs, RDS_IB_DEFAULT_FR_WR); + atomic_set(&ic->i_fastunreg_wrs, RDS_IB_DEFAULT_FR_INV_WR); /* * XXX this can fail if max_*_wr is too large? Are we supposed @@ -682,6 +723,7 @@ int rds_ib_cm_initiate_connect(struct rdma_cm_id *cm_id) if (ic->i_cm_id == cm_id) ret = 0; } + ic->i_active_side = true; return ret; } @@ -767,17 +809,27 @@ void rds_ib_conn_path_shutdown(struct rds_conn_path *cp) wait_event(rds_ib_ring_empty_wait, rds_ib_ring_empty(&ic->i_recv_ring) && (atomic_read(&ic->i_signaled_sends) == 0) && - (atomic_read(&ic->i_fastreg_wrs) == RDS_IB_DEFAULT_FR_WR)); + (atomic_read(&ic->i_fastreg_wrs) == RDS_IB_DEFAULT_FR_WR) && + (atomic_read(&ic->i_fastunreg_wrs) == RDS_IB_DEFAULT_FR_INV_WR)); tasklet_kill(&ic->i_send_tasklet); tasklet_kill(&ic->i_recv_tasklet); + atomic_set(&ic->i_cq_quiesce, 1); + /* first destroy the ib state that generates callbacks */ if (ic->i_cm_id->qp) rdma_destroy_qp(ic->i_cm_id); - if (ic->i_send_cq) + if (ic->i_send_cq) { + if (ic->rds_ibdev) + ibdev_put_vector(ic->rds_ibdev, ic->i_scq_vector); ib_destroy_cq(ic->i_send_cq); - if (ic->i_recv_cq) + } + + if (ic->i_recv_cq) { + if (ic->rds_ibdev) + ibdev_put_vector(ic->rds_ibdev, ic->i_rcq_vector); ib_destroy_cq(ic->i_recv_cq); + } /* then free the resources that ib callbacks use */ if (ic->i_send_hdrs) @@ -855,6 +907,7 @@ void rds_ib_conn_path_shutdown(struct rds_conn_path *cp) ic->i_sends = NULL; vfree(ic->i_recvs); ic->i_recvs = NULL; + ic->i_active_side = false; } int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp) diff --git a/net/rds/ib_frmr.c b/net/rds/ib_frmr.c index d921adc62765dba0bbaec9a7489227348405082e..48332a6ed7383c51def7402dcfef1e581fa677f7 100644 --- a/net/rds/ib_frmr.c +++ b/net/rds/ib_frmr.c @@ -104,14 +104,15 @@ static int rds_ib_post_reg_frmr(struct rds_ib_mr *ibmr) struct rds_ib_frmr *frmr = &ibmr->u.frmr; struct ib_send_wr *failed_wr; struct ib_reg_wr reg_wr; - int ret; + int ret, off = 0; while (atomic_dec_return(&ibmr->ic->i_fastreg_wrs) <= 0) { atomic_inc(&ibmr->ic->i_fastreg_wrs); cpu_relax(); } - ret = ib_map_mr_sg_zbva(frmr->mr, ibmr->sg, ibmr->sg_len, 0, PAGE_SIZE); + ret = ib_map_mr_sg_zbva(frmr->mr, ibmr->sg, ibmr->sg_len, + &off, PAGE_SIZE); if (unlikely(ret != ibmr->sg_len)) return ret < 0 ? ret : -EINVAL; @@ -240,8 +241,8 @@ static int rds_ib_post_inv(struct rds_ib_mr *ibmr) if (frmr->fr_state != FRMR_IS_INUSE) goto out; - while (atomic_dec_return(&ibmr->ic->i_fastreg_wrs) <= 0) { - atomic_inc(&ibmr->ic->i_fastreg_wrs); + while (atomic_dec_return(&ibmr->ic->i_fastunreg_wrs) <= 0) { + atomic_inc(&ibmr->ic->i_fastunreg_wrs); cpu_relax(); } @@ -260,7 +261,7 @@ static int rds_ib_post_inv(struct rds_ib_mr *ibmr) if (unlikely(ret)) { frmr->fr_state = FRMR_IS_STALE; frmr->fr_inv = false; - atomic_inc(&ibmr->ic->i_fastreg_wrs); + atomic_inc(&ibmr->ic->i_fastunreg_wrs); pr_err("RDS/IB: %s returned error(%d)\n", __func__, ret); goto out; } @@ -288,9 +289,10 @@ void rds_ib_mr_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc) if (frmr->fr_inv) { frmr->fr_state = FRMR_IS_FREE; frmr->fr_inv = false; + atomic_inc(&ic->i_fastreg_wrs); + } else { + atomic_inc(&ic->i_fastunreg_wrs); } - - atomic_inc(&ic->i_fastreg_wrs); } void rds_ib_unreg_frmr(struct list_head *list, unsigned int *nfreed, diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c index 606a11f681d28b18879629e758b89f8a9ae8b33f..e10624aa6959b596a2629a9f18bb25504428545f 100644 --- a/net/rds/ib_recv.c +++ b/net/rds/ib_recv.c @@ -194,6 +194,8 @@ static void rds_ib_frag_free(struct rds_ib_connection *ic, rdsdebug("frag %p page %p\n", frag, sg_page(&frag->f_sg)); rds_ib_recv_cache_put(&frag->f_cache_entry, &ic->i_cache_frags); + atomic_add(RDS_FRAG_SIZE / SZ_1K, &ic->i_cache_allocs); + rds_ib_stats_add(s_ib_recv_added_to_cache, RDS_FRAG_SIZE); } /* Recycle inc after freeing attached frags */ @@ -261,6 +263,7 @@ static struct rds_ib_incoming *rds_ib_refill_one_inc(struct rds_ib_connection *i atomic_dec(&rds_ib_allocation); return NULL; } + rds_ib_stats_inc(s_ib_rx_total_incs); } INIT_LIST_HEAD(&ibinc->ii_frags); rds_inc_init(&ibinc->ii_inc, ic->conn, ic->conn->c_faddr); @@ -278,6 +281,8 @@ static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic cache_item = rds_ib_recv_cache_get(&ic->i_cache_frags); if (cache_item) { frag = container_of(cache_item, struct rds_page_frag, f_cache_entry); + atomic_sub(RDS_FRAG_SIZE / SZ_1K, &ic->i_cache_allocs); + rds_ib_stats_add(s_ib_recv_added_to_cache, RDS_FRAG_SIZE); } else { frag = kmem_cache_alloc(rds_ib_frag_slab, slab_mask); if (!frag) @@ -290,6 +295,7 @@ static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic kmem_cache_free(rds_ib_frag_slab, frag); return NULL; } + rds_ib_stats_inc(s_ib_rx_total_frags); } INIT_LIST_HEAD(&frag->f_item); @@ -905,8 +911,12 @@ static void rds_ib_process_recv(struct rds_connection *conn, ic->i_ibinc = ibinc; hdr = &ibinc->ii_inc.i_hdr; + ibinc->ii_inc.i_rx_lat_trace[RDS_MSG_RX_HDR] = + local_clock(); memcpy(hdr, ihdr, sizeof(*hdr)); ic->i_recv_data_rem = be32_to_cpu(hdr->h_len); + ibinc->ii_inc.i_rx_lat_trace[RDS_MSG_RX_START] = + local_clock(); rdsdebug("ic %p ibinc %p rem %u flag 0x%x\n", ic, ibinc, ic->i_recv_data_rem, hdr->h_flags); @@ -980,8 +990,8 @@ void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic, } else { /* We expect errors as the qp is drained during shutdown */ if (rds_conn_up(conn) || rds_conn_connecting(conn)) - rds_ib_conn_error(conn, "recv completion on %pI4 had status %u (%s), disconnecting and reconnecting\n", - &conn->c_faddr, + rds_ib_conn_error(conn, "recv completion on <%pI4,%pI4> had status %u (%s), disconnecting and reconnecting\n", + &conn->c_laddr, &conn->c_faddr, wc->status, ib_wc_status_msg(wc->status)); } diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c index 84d90c97332f9178552f34f2e13973a351f6213a..5e72de10c484d5b080f0e583be5a309004f35fc2 100644 --- a/net/rds/ib_send.c +++ b/net/rds/ib_send.c @@ -69,16 +69,6 @@ static void rds_ib_send_complete(struct rds_message *rm, complete(rm, notify_status); } -static void rds_ib_send_unmap_data(struct rds_ib_connection *ic, - struct rm_data_op *op, - int wc_status) -{ - if (op->op_nents) - ib_dma_unmap_sg(ic->i_cm_id->device, - op->op_sg, op->op_nents, - DMA_TO_DEVICE); -} - static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic, struct rm_rdma_op *op, int wc_status) @@ -139,6 +129,21 @@ static void rds_ib_send_unmap_atomic(struct rds_ib_connection *ic, rds_ib_stats_inc(s_ib_atomic_fadd); } +static void rds_ib_send_unmap_data(struct rds_ib_connection *ic, + struct rm_data_op *op, + int wc_status) +{ + struct rds_message *rm = container_of(op, struct rds_message, data); + + if (op->op_nents) + ib_dma_unmap_sg(ic->i_cm_id->device, + op->op_sg, op->op_nents, + DMA_TO_DEVICE); + + if (rm->rdma.op_active && rm->data.op_notify) + rds_ib_send_unmap_rdma(ic, &rm->rdma, wc_status); +} + /* * Unmap the resources associated with a struct send_work. * @@ -300,8 +305,8 @@ void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc) /* We expect errors as the qp is drained during shutdown */ if (wc->status != IB_WC_SUCCESS && rds_conn_up(conn)) { - rds_ib_conn_error(conn, "send completion on %pI4 had status %u (%s), disconnecting and reconnecting\n", - &conn->c_faddr, wc->status, + rds_ib_conn_error(conn, "send completion on <%pI4,%pI4> had status %u (%s), disconnecting and reconnecting\n", + &conn->c_laddr, &conn->c_faddr, wc->status, ib_wc_status_msg(wc->status)); } } diff --git a/net/rds/ib_stats.c b/net/rds/ib_stats.c index 7e78dca1f252c671741443cdf660f38da6c575a0..9252ad126335971fa202e5aee6e2babb55d969e0 100644 --- a/net/rds/ib_stats.c +++ b/net/rds/ib_stats.c @@ -55,6 +55,8 @@ static const char *const rds_ib_stat_names[] = { "ib_rx_refill_from_cq", "ib_rx_refill_from_thread", "ib_rx_alloc_limit", + "ib_rx_total_frags", + "ib_rx_total_incs", "ib_rx_credit_updates", "ib_ack_sent", "ib_ack_send_failure", diff --git a/net/rds/rdma.c b/net/rds/rdma.c index ea961144084fadb3ee98abd708ce5cd31eba42a5..f06fac4886b090f346090ac26dad8e1eaa8da471 100644 --- a/net/rds/rdma.c +++ b/net/rds/rdma.c @@ -40,7 +40,6 @@ /* * XXX * - build with sparse - * - should we limit the size of a mr region? let transport return failure? * - should we detect duplicate keys on a socket? hmm. * - an rdma is an mlock, apply rlimit? */ @@ -200,6 +199,14 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args, goto out; } + /* Restrict the size of mr irrespective of underlying transport + * To account for unaligned mr regions, subtract one from nr_pages + */ + if ((nr_pages - 1) > (RDS_MAX_MSG_SIZE >> PAGE_SHIFT)) { + ret = -EMSGSIZE; + goto out; + } + rdsdebug("RDS: get_mr addr %llx len %llu nr_pages %u\n", args->vec.addr, args->vec.bytes, nr_pages); @@ -415,7 +422,8 @@ void rds_rdma_unuse(struct rds_sock *rs, u32 r_key, int force) spin_lock_irqsave(&rs->rs_rdma_lock, flags); mr = rds_mr_tree_walk(&rs->rs_rdma_keys, r_key, NULL); if (!mr) { - printk(KERN_ERR "rds: trying to unuse MR with unknown r_key %u!\n", r_key); + pr_debug("rds: trying to unuse MR with unknown r_key %u!\n", + r_key); spin_unlock_irqrestore(&rs->rs_rdma_lock, flags); return; } @@ -626,6 +634,16 @@ int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm, } op->op_notifier->n_user_token = args->user_token; op->op_notifier->n_status = RDS_RDMA_SUCCESS; + + /* Enable rmda notification on data operation for composite + * rds messages and make sure notification is enabled only + * for the data operation which follows it so that application + * gets notified only after full message gets delivered. + */ + if (rm->data.op_sg) { + rm->rdma.op_notify = 0; + rm->data.op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME); + } } /* The cookie contains the R_Key of the remote memory region, and diff --git a/net/rds/rdma_transport.c b/net/rds/rdma_transport.c index d5f3117671575f84550c4bee7b4aef7753bd9f59..fc59821f0a27bd2a529c17a36c5b06a1d5d91a8e 100644 --- a/net/rds/rdma_transport.c +++ b/net/rds/rdma_transport.c @@ -206,18 +206,13 @@ static int rds_rdma_init(void) { int ret; - ret = rds_rdma_listen_init(); + ret = rds_ib_init(); if (ret) goto out; - ret = rds_ib_init(); + ret = rds_rdma_listen_init(); if (ret) - goto err_ib_init; - - goto out; - -err_ib_init: - rds_rdma_listen_stop(); + rds_ib_exit(); out: return ret; } diff --git a/net/rds/rds.h b/net/rds/rds.h index ebbf909b87ec3f62abec2573dcd55f4054138848..07fff73dd4f3f956c2cab393a9e834bb1215fc24 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -50,6 +50,9 @@ void rdsdebug(char *fmt, ...) #define RDS_FRAG_SHIFT 12 #define RDS_FRAG_SIZE ((unsigned int)(1 << RDS_FRAG_SHIFT)) +/* Used to limit both RDMA and non-RDMA RDS message to 1MB */ +#define RDS_MAX_MSG_SIZE ((unsigned int)(1 << 20)) + #define RDS_CONG_MAP_BYTES (65536 / 8) #define RDS_CONG_MAP_PAGES (PAGE_ALIGN(RDS_CONG_MAP_BYTES) / PAGE_SIZE) #define RDS_CONG_MAP_PAGE_BITS (PAGE_SIZE * 8) @@ -250,6 +253,11 @@ struct rds_ext_header_rdma_dest { #define RDS_EXTHDR_GEN_NUM 6 #define __RDS_EXTHDR_MAX 16 /* for now */ +#define RDS_RX_MAX_TRACES (RDS_MSG_RX_DGRAM_TRACE_MAX + 1) +#define RDS_MSG_RX_HDR 0 +#define RDS_MSG_RX_START 1 +#define RDS_MSG_RX_END 2 +#define RDS_MSG_RX_CMSG 3 struct rds_incoming { atomic_t i_refcount; @@ -262,6 +270,7 @@ struct rds_incoming { rds_rdma_cookie_t i_rdma_cookie; struct timeval i_rx_tstamp; + u64 i_rx_lat_trace[RDS_RX_MAX_TRACES]; }; struct rds_mr { @@ -419,6 +428,7 @@ struct rds_message { } rdma; struct rm_data_op { unsigned int op_active:1; + unsigned int op_notify:1; unsigned int op_nents; unsigned int op_count; unsigned int op_dmasg; @@ -571,6 +581,10 @@ struct rds_sock { unsigned char rs_recverr, rs_cong_monitor; u32 rs_hash_initval; + + /* Socket receive path trace points*/ + u8 rs_rx_traces; + u8 rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX]; }; static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk) @@ -630,6 +644,9 @@ struct rds_statistics { uint64_t s_cong_update_received; uint64_t s_cong_send_error; uint64_t s_cong_send_blocked; + uint64_t s_recv_bytes_added_to_socket; + uint64_t s_recv_bytes_removed_from_socket; + }; /* af_rds.c */ diff --git a/net/rds/recv.c b/net/rds/recv.c index 9d0666e5fe35db4215ef9c7aae9903d73f1fc5b2..8b7e7b7f2c2dbebee31dd7626540b797f1fa353f 100644 --- a/net/rds/recv.c +++ b/net/rds/recv.c @@ -43,6 +43,8 @@ void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn, __be32 saddr) { + int i; + atomic_set(&inc->i_refcount, 1); INIT_LIST_HEAD(&inc->i_item); inc->i_conn = conn; @@ -50,6 +52,9 @@ void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn, inc->i_rdma_cookie = 0; inc->i_rx_tstamp.tv_sec = 0; inc->i_rx_tstamp.tv_usec = 0; + + for (i = 0; i < RDS_RX_MAX_TRACES; i++) + inc->i_rx_lat_trace[i] = 0; } EXPORT_SYMBOL_GPL(rds_inc_init); @@ -94,6 +99,10 @@ static void rds_recv_rcvbuf_delta(struct rds_sock *rs, struct sock *sk, return; rs->rs_rcv_bytes += delta; + if (delta > 0) + rds_stats_add(s_recv_bytes_added_to_socket, delta); + else + rds_stats_add(s_recv_bytes_removed_from_socket, -delta); now_congested = rs->rs_rcv_bytes > rds_sk_rcvbuf(rs); rdsdebug("rs %p (%pI4:%u) recv bytes %d buf %d " @@ -369,6 +378,7 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, if (sock_flag(sk, SOCK_RCVTSTAMP)) do_gettimeofday(&inc->i_rx_tstamp); rds_inc_addref(inc); + inc->i_rx_lat_trace[RDS_MSG_RX_END] = local_clock(); list_add_tail(&inc->i_item, &rs->rs_recv_queue); __rds_wake_sk_sleep(sk); } else { @@ -530,7 +540,7 @@ static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg, ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RDMA_DEST, sizeof(inc->i_rdma_cookie), &inc->i_rdma_cookie); if (ret) - return ret; + goto out; } if ((inc->i_rx_tstamp.tv_sec != 0) && @@ -539,10 +549,30 @@ static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg, sizeof(struct timeval), &inc->i_rx_tstamp); if (ret) - return ret; + goto out; } - return 0; + if (rs->rs_rx_traces) { + struct rds_cmsg_rx_trace t; + int i, j; + + inc->i_rx_lat_trace[RDS_MSG_RX_CMSG] = local_clock(); + t.rx_traces = rs->rs_rx_traces; + for (i = 0; i < rs->rs_rx_traces; i++) { + j = rs->rs_rx_trace[i]; + t.rx_trace_pos[i] = j; + t.rx_trace[i] = inc->i_rx_lat_trace[j + 1] - + inc->i_rx_lat_trace[j]; + } + + ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RXPATH_LATENCY, + sizeof(t), &t); + if (ret) + goto out; + } + +out: + return ret; } int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size, diff --git a/net/rds/send.c b/net/rds/send.c index 77c8c6e613adf65057d3696806024fdf2cb15247..5cc64039caf71d2378d8f7609ab77331e733c19d 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -476,12 +476,14 @@ void rds_rdma_send_complete(struct rds_message *rm, int status) struct rm_rdma_op *ro; struct rds_notifier *notifier; unsigned long flags; + unsigned int notify = 0; spin_lock_irqsave(&rm->m_rs_lock, flags); + notify = rm->rdma.op_notify | rm->data.op_notify; ro = &rm->rdma; if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) && - ro->op_active && ro->op_notify && ro->op_notifier) { + ro->op_active && notify && ro->op_notifier) { notifier = ro->op_notifier; rs = rm->m_rs; sock_hold(rds_rs_to_sk(rs)); @@ -945,6 +947,11 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, ret = rds_cmsg_rdma_map(rs, rm, cmsg); if (!ret) *allocated_mr = 1; + else if (ret == -ENODEV) + /* Accommodate the get_mr() case which can fail + * if connection isn't established yet. + */ + ret = -EAGAIN; break; case RDS_CMSG_ATOMIC_CSWP: case RDS_CMSG_ATOMIC_FADD: @@ -987,6 +994,26 @@ static int rds_send_mprds_hash(struct rds_sock *rs, struct rds_connection *conn) return hash; } +static int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes) +{ + struct rds_rdma_args *args; + struct cmsghdr *cmsg; + + for_each_cmsghdr(cmsg, msg) { + if (!CMSG_OK(msg, cmsg)) + return -EINVAL; + + if (cmsg->cmsg_level != SOL_RDS) + continue; + + if (cmsg->cmsg_type == RDS_CMSG_RDMA_ARGS) { + args = CMSG_DATA(cmsg); + *rdma_bytes += args->remote_vec.bytes; + } + } + return 0; +} + int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) { struct sock *sk = sock->sk; @@ -1001,6 +1028,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) int nonblock = msg->msg_flags & MSG_DONTWAIT; long timeo = sock_sndtimeo(sk, nonblock); struct rds_conn_path *cpath; + size_t total_payload_len = payload_len, rdma_payload_len = 0; /* Mirror Linux UDP mirror of BSD error message compatibility */ /* XXX: Perhaps MSG_MORE someday */ @@ -1033,6 +1061,16 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) } release_sock(sk); + ret = rds_rdma_bytes(msg, &rdma_payload_len); + if (ret) + goto out; + + total_payload_len += rdma_payload_len; + if (max_t(size_t, payload_len, rdma_payload_len) > RDS_MAX_MSG_SIZE) { + ret = -EMSGSIZE; + goto out; + } + if (payload_len > rds_sk_sndbuf(rs)) { ret = -EMSGSIZE; goto out; @@ -1082,8 +1120,12 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) /* Parse any control messages the user may have included. */ ret = rds_cmsg_send(rs, rm, msg, &allocated_mr); - if (ret) + if (ret) { + /* Trigger connection so that its ready for the next retry */ + if (ret == -EAGAIN) + rds_conn_connect_if_down(conn); goto out; + } if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) { printk_ratelimited(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", @@ -1169,7 +1211,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) * or * RDS_FLAG_HB_PONG|RDS_FLAG_ACK_REQUIRED */ -int +static int rds_send_probe(struct rds_conn_path *cp, __be16 sport, __be16 dport, u8 h_flags) { @@ -1238,7 +1280,7 @@ rds_send_pong(struct rds_conn_path *cp, __be16 dport) return rds_send_probe(cp, 0, dport, 0); } -void +static void rds_send_ping(struct rds_connection *conn) { unsigned long flags; diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c index f74bab3ecdca69b0b59e18341a15ab8fe095b16c..67d0929c7d3d0c97ed209af9a67b4d83343c3de1 100644 --- a/net/rds/tcp_listen.c +++ b/net/rds/tcp_listen.c @@ -79,6 +79,7 @@ int rds_tcp_keepalive(struct socket *sock) * smaller ip address, we recycle conns in RDS_CONN_ERROR on the passive side * by moving them to CONNECTING in this function. */ +static struct rds_tcp_connection *rds_tcp_accept_one_path(struct rds_connection *conn) { int i; diff --git a/net/rds/tcp_recv.c b/net/rds/tcp_recv.c index ad4892e97f91b3fdd4928072ea6f9a9aeaf11352..e006ef8e6d404195f19e5d8b9bbf6683b504a7cc 100644 --- a/net/rds/tcp_recv.c +++ b/net/rds/tcp_recv.c @@ -180,6 +180,9 @@ static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb, rdsdebug("alloced tinc %p\n", tinc); rds_inc_path_init(&tinc->ti_inc, cp, cp->cp_conn->c_faddr); + tinc->ti_inc.i_rx_lat_trace[RDS_MSG_RX_HDR] = + local_clock(); + /* * XXX * we might be able to use the __ variants when * we've already serialized at a higher level. @@ -204,6 +207,8 @@ static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb, /* could be 0 for a 0 len message */ tc->t_tinc_data_rem = be32_to_cpu(tinc->ti_inc.i_hdr.h_len); + tinc->ti_inc.i_rx_lat_trace[RDS_MSG_RX_START] = + local_clock(); } }