提交 91d2f14b 编写于 作者: D David S. Miller

Merge branch 'net/rds/4.3-v3' of git://git.kernel.org/pub/scm/linux/kernel/git/ssantosh/linux

Santosh Shilimkar says:

====================
RDS: connection scalability and performance improvements

[v4]
Re-sending the same patches from v3 again since my repost of
patch 05/14 from v3 was whitespace damaged.

[v3]
Updated patch "[PATCH v2 05/14] RDS: defer the over_batch work to
send worker" as per David Miller's comment [4] to avoid the magic
value usage. Patch now makes use of already available but unused
send_batch_count module parameter. Rest of the patches are same as
earlier version v2 [3]

[v2]:
Dropped "[PATCH 05/15] RDS: increase size of hash-table to 8K" from
earlier version [1]. I plan to address the hash table scalability using
re-sizable hash tables as suggested by David Laight and David Miller [2]

This series addresses RDS connection bottlenecks on massive workloads and
improve the RDMA performance almost by 3X. RDS TCP also gets a small gain
of about 12%.

RDS is being used in massive systems with high scalability where several
hundred thousand end points and tens of thousands of local processes
are operating in tens of thousand sockets. Being RC(reliable connection),
socket bind and release happens very often and any inefficiencies in
bind hash look ups hurts the overall system performance. RDS bin hash-table
uses global spin-lock which is the biggest bottleneck. To make matter worst,
it uses rcu inside global lock for hash buckets.
This is being addressed by simply using per bucket rw lock which makes the
locking simple and very efficient. The hash table size is still an issue and
I plan to address it by using re-sizable hash tables as suggested on the list.

For RDS RDMA improvement, the completion handling is revamped so that we
can do batch completions. Both send and receive completion handlers are
split logically to achieve the same. RDS 8K messages being one of the
key usecase, mr pool is adapted to have the 8K mrs along with default 1M
mrs. And while doing this, few fixes and couple of bottlenecks seen with
rds_sendmsg() are addressed.

Series applies against 4.3-rc1 as well net-next. Its tested on Oracle
hardware with IB fabric for both bcopy as well as RDMA mode. RDS TCP is
tested with iXGB NIC. Like last time, iWARP transport is untested with
these changes. The patchset is also available at below git repo:

git://git.kernel.org/pub/scm/linux/kernel/git/ssantosh/linux.git net/rds/4.3-v3

As a side note, the IB HCA driver I used for testing misses at least 3
important patches in upstream to see the full blown IB performance and
am hoping to get that in mainline with help of them.
====================
Signed-off-by: NDavid S. Miller <davem@davemloft.net>
......@@ -72,13 +72,7 @@ static int rds_release(struct socket *sock)
rds_clear_recv_queue(rs);
rds_cong_remove_socket(rs);
/*
* the binding lookup hash uses rcu, we need to
* make sure we synchronize_rcu before we free our
* entry
*/
rds_remove_bound(rs);
synchronize_rcu();
rds_send_drop_to(rs, NULL);
rds_rdma_drop_keys(rs);
......@@ -588,6 +582,8 @@ static int rds_init(void)
{
int ret;
rds_bind_lock_init();
ret = rds_conn_init();
if (ret)
goto out;
......
......@@ -38,48 +38,50 @@
#include <linux/ratelimit.h>
#include "rds.h"
struct bind_bucket {
rwlock_t lock;
struct hlist_head head;
};
#define BIND_HASH_SIZE 1024
static struct hlist_head bind_hash_table[BIND_HASH_SIZE];
static DEFINE_SPINLOCK(rds_bind_lock);
static struct bind_bucket bind_hash_table[BIND_HASH_SIZE];
static struct hlist_head *hash_to_bucket(__be32 addr, __be16 port)
static struct bind_bucket *hash_to_bucket(__be32 addr, __be16 port)
{
return bind_hash_table + (jhash_2words((u32)addr, (u32)port, 0) &
(BIND_HASH_SIZE - 1));
}
static struct rds_sock *rds_bind_lookup(__be32 addr, __be16 port,
/* must hold either read or write lock (write lock for insert != NULL) */
static struct rds_sock *rds_bind_lookup(struct bind_bucket *bucket,
__be32 addr, __be16 port,
struct rds_sock *insert)
{
struct rds_sock *rs;
struct hlist_head *head = hash_to_bucket(addr, port);
struct hlist_head *head = &bucket->head;
u64 cmp;
u64 needle = ((u64)be32_to_cpu(addr) << 32) | be16_to_cpu(port);
rcu_read_lock();
hlist_for_each_entry_rcu(rs, head, rs_bound_node) {
hlist_for_each_entry(rs, head, rs_bound_node) {
cmp = ((u64)be32_to_cpu(rs->rs_bound_addr) << 32) |
be16_to_cpu(rs->rs_bound_port);
if (cmp == needle) {
rcu_read_unlock();
rds_sock_addref(rs);
return rs;
}
}
rcu_read_unlock();
if (insert) {
/*
* make sure our addr and port are set before
* we are added to the list, other people
* in rcu will find us as soon as the
* hlist_add_head_rcu is done
* we are added to the list.
*/
insert->rs_bound_addr = addr;
insert->rs_bound_port = port;
rds_sock_addref(insert);
hlist_add_head_rcu(&insert->rs_bound_node, head);
hlist_add_head(&insert->rs_bound_node, head);
}
return NULL;
}
......@@ -93,16 +95,21 @@ static struct rds_sock *rds_bind_lookup(__be32 addr, __be16 port,
struct rds_sock *rds_find_bound(__be32 addr, __be16 port)
{
struct rds_sock *rs;
unsigned long flags;
struct bind_bucket *bucket = hash_to_bucket(addr, port);
rs = rds_bind_lookup(addr, port, NULL);
read_lock_irqsave(&bucket->lock, flags);
rs = rds_bind_lookup(bucket, addr, port, NULL);
read_unlock_irqrestore(&bucket->lock, flags);
if (rs && !sock_flag(rds_rs_to_sk(rs), SOCK_DEAD))
rds_sock_addref(rs);
else
if (rs && sock_flag(rds_rs_to_sk(rs), SOCK_DEAD)) {
rds_sock_put(rs);
rs = NULL;
}
rdsdebug("returning rs %p for %pI4:%u\n", rs, &addr,
ntohs(port));
return rs;
}
......@@ -112,6 +119,7 @@ static int rds_add_bound(struct rds_sock *rs, __be32 addr, __be16 *port)
unsigned long flags;
int ret = -EADDRINUSE;
u16 rover, last;
struct bind_bucket *bucket;
if (*port != 0) {
rover = be16_to_cpu(*port);
......@@ -121,42 +129,48 @@ static int rds_add_bound(struct rds_sock *rs, __be32 addr, __be16 *port)
last = rover - 1;
}
spin_lock_irqsave(&rds_bind_lock, flags);
do {
struct rds_sock *rrs;
if (rover == 0)
rover++;
if (!rds_bind_lookup(addr, cpu_to_be16(rover), rs)) {
bucket = hash_to_bucket(addr, cpu_to_be16(rover));
write_lock_irqsave(&bucket->lock, flags);
rrs = rds_bind_lookup(bucket, addr, cpu_to_be16(rover), rs);
write_unlock_irqrestore(&bucket->lock, flags);
if (!rrs) {
*port = rs->rs_bound_port;
ret = 0;
rdsdebug("rs %p binding to %pI4:%d\n",
rs, &addr, (int)ntohs(*port));
break;
} else {
rds_sock_put(rrs);
}
} while (rover++ != last);
spin_unlock_irqrestore(&rds_bind_lock, flags);
return ret;
}
void rds_remove_bound(struct rds_sock *rs)
{
unsigned long flags;
struct bind_bucket *bucket =
hash_to_bucket(rs->rs_bound_addr, rs->rs_bound_port);
spin_lock_irqsave(&rds_bind_lock, flags);
write_lock_irqsave(&bucket->lock, flags);
if (rs->rs_bound_addr) {
rdsdebug("rs %p unbinding from %pI4:%d\n",
rs, &rs->rs_bound_addr,
ntohs(rs->rs_bound_port));
hlist_del_init_rcu(&rs->rs_bound_node);
hlist_del_init(&rs->rs_bound_node);
rds_sock_put(rs);
rs->rs_bound_addr = 0;
}
spin_unlock_irqrestore(&rds_bind_lock, flags);
write_unlock_irqrestore(&bucket->lock, flags);
}
int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len)
......@@ -200,9 +214,13 @@ int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len)
out:
release_sock(sk);
/* we might have called rds_remove_bound on error */
if (ret)
synchronize_rcu();
return ret;
}
void rds_bind_lock_init(void)
{
int i;
for (i = 0; i < BIND_HASH_SIZE; i++)
rwlock_init(&bind_hash_table[i].lock);
}
......@@ -43,14 +43,14 @@
#include "rds.h"
#include "ib.h"
static unsigned int fmr_pool_size = RDS_FMR_POOL_SIZE;
unsigned int fmr_message_size = RDS_FMR_SIZE + 1; /* +1 allows for unaligned MRs */
unsigned int rds_ib_fmr_1m_pool_size = RDS_FMR_1M_POOL_SIZE;
unsigned int rds_ib_fmr_8k_pool_size = RDS_FMR_8K_POOL_SIZE;
unsigned int rds_ib_retry_count = RDS_IB_DEFAULT_RETRY_COUNT;
module_param(fmr_pool_size, int, 0444);
MODULE_PARM_DESC(fmr_pool_size, " Max number of fmr per HCA");
module_param(fmr_message_size, int, 0444);
MODULE_PARM_DESC(fmr_message_size, " Max size of a RDMA transfer");
module_param(rds_ib_fmr_1m_pool_size, int, 0444);
MODULE_PARM_DESC(rds_ib_fmr_1m_pool_size, " Max number of 1M fmr per HCA");
module_param(rds_ib_fmr_8k_pool_size, int, 0444);
MODULE_PARM_DESC(rds_ib_fmr_8k_pool_size, " Max number of 8K fmr per HCA");
module_param(rds_ib_retry_count, int, 0444);
MODULE_PARM_DESC(rds_ib_retry_count, " Number of hw retries before reporting an error");
......@@ -97,8 +97,10 @@ static void rds_ib_dev_free(struct work_struct *work)
struct rds_ib_device *rds_ibdev = container_of(work,
struct rds_ib_device, free_work);
if (rds_ibdev->mr_pool)
rds_ib_destroy_mr_pool(rds_ibdev->mr_pool);
if (rds_ibdev->mr_8k_pool)
rds_ib_destroy_mr_pool(rds_ibdev->mr_8k_pool);
if (rds_ibdev->mr_1m_pool)
rds_ib_destroy_mr_pool(rds_ibdev->mr_1m_pool);
if (rds_ibdev->pd)
ib_dealloc_pd(rds_ibdev->pd);
......@@ -148,9 +150,13 @@ static void rds_ib_add_one(struct ib_device *device)
rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE);
rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32;
rds_ibdev->max_fmrs = dev_attr->max_fmr ?
min_t(unsigned int, dev_attr->max_fmr, fmr_pool_size) :
fmr_pool_size;
rds_ibdev->max_1m_fmrs = dev_attr->max_mr ?
min_t(unsigned int, (dev_attr->max_mr / 2),
rds_ib_fmr_1m_pool_size) : rds_ib_fmr_1m_pool_size;
rds_ibdev->max_8k_fmrs = dev_attr->max_mr ?
min_t(unsigned int, ((dev_attr->max_mr / 2) * RDS_MR_8K_SCALE),
rds_ib_fmr_8k_pool_size) : rds_ib_fmr_8k_pool_size;
rds_ibdev->max_initiator_depth = dev_attr->max_qp_init_rd_atom;
rds_ibdev->max_responder_resources = dev_attr->max_qp_rd_atom;
......@@ -162,12 +168,25 @@ static void rds_ib_add_one(struct ib_device *device)
goto put_dev;
}
rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
if (IS_ERR(rds_ibdev->mr_pool)) {
rds_ibdev->mr_pool = NULL;
rds_ibdev->mr_1m_pool =
rds_ib_create_mr_pool(rds_ibdev, RDS_IB_MR_1M_POOL);
if (IS_ERR(rds_ibdev->mr_1m_pool)) {
rds_ibdev->mr_1m_pool = NULL;
goto put_dev;
}
rds_ibdev->mr_8k_pool =
rds_ib_create_mr_pool(rds_ibdev, RDS_IB_MR_8K_POOL);
if (IS_ERR(rds_ibdev->mr_8k_pool)) {
rds_ibdev->mr_8k_pool = NULL;
goto put_dev;
}
rdsdebug("RDS/IB: max_mr = %d, max_wrs = %d, max_sge = %d, fmr_max_remaps = %d, max_1m_fmrs = %d, max_8k_fmrs = %d\n",
dev_attr->max_fmr, rds_ibdev->max_wrs, rds_ibdev->max_sge,
rds_ibdev->fmr_max_remaps, rds_ibdev->max_1m_fmrs,
rds_ibdev->max_8k_fmrs);
INIT_LIST_HEAD(&rds_ibdev->ipaddr_list);
INIT_LIST_HEAD(&rds_ibdev->conn_list);
......
......@@ -9,8 +9,11 @@
#include "rds.h"
#include "rdma_transport.h"
#define RDS_FMR_SIZE 256
#define RDS_FMR_POOL_SIZE 8192
#define RDS_FMR_1M_POOL_SIZE (8192 / 2)
#define RDS_FMR_1M_MSG_SIZE 256
#define RDS_FMR_8K_MSG_SIZE 2
#define RDS_MR_8K_SCALE (256 / (RDS_FMR_8K_MSG_SIZE + 1))
#define RDS_FMR_8K_POOL_SIZE (RDS_MR_8K_SCALE * (8192 / 2))
#define RDS_IB_MAX_SGE 8
#define RDS_IB_RECV_SGE 2
......@@ -24,6 +27,9 @@
#define RDS_IB_RECYCLE_BATCH_COUNT 32
#define RDS_IB_WC_MAX 32
#define RDS_IB_SEND_OP BIT_ULL(63)
extern struct rw_semaphore rds_ib_devices_lock;
extern struct list_head rds_ib_devices;
......@@ -89,6 +95,20 @@ struct rds_ib_work_ring {
atomic_t w_free_ctr;
};
/* Rings are posted with all the allocations they'll need to queue the
* incoming message to the receiving socket so this can't fail.
* All fragments start with a header, so we can make sure we're not receiving
* garbage, and we can tell a small 8 byte fragment from an ACK frame.
*/
struct rds_ib_ack_state {
u64 ack_next;
u64 ack_recv;
unsigned int ack_required:1;
unsigned int ack_next_valid:1;
unsigned int ack_recv_valid:1;
};
struct rds_ib_device;
struct rds_ib_connection {
......@@ -102,6 +122,12 @@ struct rds_ib_connection {
struct ib_pd *i_pd;
struct ib_cq *i_send_cq;
struct ib_cq *i_recv_cq;
struct ib_wc i_send_wc[RDS_IB_WC_MAX];
struct ib_wc i_recv_wc[RDS_IB_WC_MAX];
/* interrupt handling */
struct tasklet_struct i_send_tasklet;
struct tasklet_struct i_recv_tasklet;
/* tx */
struct rds_ib_work_ring i_send_ring;
......@@ -112,7 +138,6 @@ struct rds_ib_connection {
atomic_t i_signaled_sends;
/* rx */
struct tasklet_struct i_recv_tasklet;
struct mutex i_recv_mutex;
struct rds_ib_work_ring i_recv_ring;
struct rds_ib_incoming *i_ibinc;
......@@ -164,6 +189,12 @@ struct rds_ib_connection {
struct rds_ib_ipaddr {
struct list_head list;
__be32 ipaddr;
struct rcu_head rcu;
};
enum {
RDS_IB_MR_8K_POOL,
RDS_IB_MR_1M_POOL,
};
struct rds_ib_device {
......@@ -172,9 +203,12 @@ struct rds_ib_device {
struct list_head conn_list;
struct ib_device *dev;
struct ib_pd *pd;
struct rds_ib_mr_pool *mr_pool;
unsigned int fmr_max_remaps;
unsigned int max_fmrs;
struct rds_ib_mr_pool *mr_1m_pool;
struct rds_ib_mr_pool *mr_8k_pool;
unsigned int fmr_max_remaps;
unsigned int max_8k_fmrs;
unsigned int max_1m_fmrs;
int max_sge;
unsigned int max_wrs;
unsigned int max_initiator_depth;
......@@ -197,14 +231,14 @@ struct rds_ib_device {
struct rds_ib_statistics {
uint64_t s_ib_connect_raced;
uint64_t s_ib_listen_closed_stale;
uint64_t s_ib_tx_cq_call;
uint64_t s_ib_evt_handler_call;
uint64_t s_ib_tasklet_call;
uint64_t s_ib_tx_cq_event;
uint64_t s_ib_tx_ring_full;
uint64_t s_ib_tx_throttle;
uint64_t s_ib_tx_sg_mapping_failure;
uint64_t s_ib_tx_stalled;
uint64_t s_ib_tx_credit_updates;
uint64_t s_ib_rx_cq_call;
uint64_t s_ib_rx_cq_event;
uint64_t s_ib_rx_ring_empty;
uint64_t s_ib_rx_refill_from_cq;
......@@ -216,12 +250,18 @@ struct rds_ib_statistics {
uint64_t s_ib_ack_send_delayed;
uint64_t s_ib_ack_send_piggybacked;
uint64_t s_ib_ack_received;
uint64_t s_ib_rdma_mr_alloc;
uint64_t s_ib_rdma_mr_free;
uint64_t s_ib_rdma_mr_used;
uint64_t s_ib_rdma_mr_pool_flush;
uint64_t s_ib_rdma_mr_pool_wait;
uint64_t s_ib_rdma_mr_pool_depleted;
uint64_t s_ib_rdma_mr_8k_alloc;
uint64_t s_ib_rdma_mr_8k_free;
uint64_t s_ib_rdma_mr_8k_used;
uint64_t s_ib_rdma_mr_8k_pool_flush;
uint64_t s_ib_rdma_mr_8k_pool_wait;
uint64_t s_ib_rdma_mr_8k_pool_depleted;
uint64_t s_ib_rdma_mr_1m_alloc;
uint64_t s_ib_rdma_mr_1m_free;
uint64_t s_ib_rdma_mr_1m_used;
uint64_t s_ib_rdma_mr_1m_pool_flush;
uint64_t s_ib_rdma_mr_1m_pool_wait;
uint64_t s_ib_rdma_mr_1m_pool_depleted;
uint64_t s_ib_atomic_cswp;
uint64_t s_ib_atomic_fadd;
};
......@@ -273,7 +313,8 @@ struct rds_ib_device *rds_ib_get_client_data(struct ib_device *device);
void rds_ib_dev_put(struct rds_ib_device *rds_ibdev);
extern struct ib_client rds_ib_client;
extern unsigned int fmr_message_size;
extern unsigned int rds_ib_fmr_1m_pool_size;
extern unsigned int rds_ib_fmr_8k_pool_size;
extern unsigned int rds_ib_retry_count;
extern spinlock_t ib_nodev_conns_lock;
......@@ -303,7 +344,8 @@ int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr);
void rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn);
void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn);
void rds_ib_destroy_nodev_conns(void);
struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *);
struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_dev,
int npages);
void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_connection *iinfo);
void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *);
void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
......@@ -323,7 +365,8 @@ void rds_ib_recv_free_caches(struct rds_ib_connection *ic);
void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp);
void rds_ib_inc_free(struct rds_incoming *inc);
int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to);
void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context);
void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc,
struct rds_ib_ack_state *state);
void rds_ib_recv_tasklet_fn(unsigned long data);
void rds_ib_recv_init_ring(struct rds_ib_connection *ic);
void rds_ib_recv_clear_ring(struct rds_ib_connection *ic);
......@@ -331,6 +374,7 @@ void rds_ib_recv_init_ack(struct rds_ib_connection *ic);
void rds_ib_attempt_ack(struct rds_ib_connection *ic);
void rds_ib_ack_send_complete(struct rds_ib_connection *ic);
u64 rds_ib_piggyb_ack(struct rds_ib_connection *ic);
void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required);
/* ib_ring.c */
void rds_ib_ring_init(struct rds_ib_work_ring *ring, u32 nr);
......@@ -348,7 +392,7 @@ extern wait_queue_head_t rds_ib_ring_empty_wait;
void rds_ib_xmit_complete(struct rds_connection *conn);
int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
unsigned int hdr_off, unsigned int sg, unsigned int off);
void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context);
void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc);
void rds_ib_send_init_ring(struct rds_ib_connection *ic);
void rds_ib_send_clear_ring(struct rds_ib_connection *ic);
int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op);
......
......@@ -216,6 +216,96 @@ static void rds_ib_cq_event_handler(struct ib_event *event, void *data)
event->event, ib_event_msg(event->event), data);
}
/* Plucking the oldest entry from the ring can be done concurrently with
* the thread refilling the ring. Each ring operation is protected by
* spinlocks and the transient state of refilling doesn't change the
* recording of which entry is oldest.
*
* This relies on IB only calling one cq comp_handler for each cq so that
* there will only be one caller of rds_recv_incoming() per RDS connection.
*/
static void rds_ib_cq_comp_handler_recv(struct ib_cq *cq, void *context)
{
struct rds_connection *conn = context;
struct rds_ib_connection *ic = conn->c_transport_data;
rdsdebug("conn %p cq %p\n", conn, cq);
rds_ib_stats_inc(s_ib_evt_handler_call);
tasklet_schedule(&ic->i_recv_tasklet);
}
static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
struct ib_wc *wcs,
struct rds_ib_ack_state *ack_state)
{
int nr;
int i;
struct ib_wc *wc;
while ((nr = ib_poll_cq(cq, RDS_IB_WC_MAX, wcs)) > 0) {
for (i = 0; i < nr; i++) {
wc = wcs + i;
rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
(unsigned long long)wc->wr_id, wc->status,
wc->byte_len, be32_to_cpu(wc->ex.imm_data));
if (wc->wr_id & RDS_IB_SEND_OP)
rds_ib_send_cqe_handler(ic, wc);
else
rds_ib_recv_cqe_handler(ic, wc, ack_state);
}
}
}
static void rds_ib_tasklet_fn_send(unsigned long data)
{
struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
struct rds_connection *conn = ic->conn;
struct rds_ib_ack_state state;
rds_ib_stats_inc(s_ib_tasklet_call);
memset(&state, 0, sizeof(state));
poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state);
ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP);
poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state);
if (rds_conn_up(conn) &&
(!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
test_bit(0, &conn->c_map_queued)))
rds_send_xmit(ic->conn);
}
static void rds_ib_tasklet_fn_recv(unsigned long data)
{
struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
struct rds_connection *conn = ic->conn;
struct rds_ib_device *rds_ibdev = ic->rds_ibdev;
struct rds_ib_ack_state state;
if (!rds_ibdev)
rds_conn_drop(conn);
rds_ib_stats_inc(s_ib_tasklet_call);
memset(&state, 0, sizeof(state));
poll_cq(ic, ic->i_recv_cq, ic->i_recv_wc, &state);
ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
poll_cq(ic, ic->i_recv_cq, ic->i_recv_wc, &state);
if (state.ack_next_valid)
rds_ib_set_ack(ic, state.ack_next, state.ack_required);
if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
rds_send_drop_acked(conn, state.ack_recv, NULL);
ic->i_ack_recv = state.ack_recv;
}
if (rds_conn_up(conn))
rds_ib_attempt_ack(ic);
}
static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
{
struct rds_connection *conn = data;
......@@ -238,6 +328,18 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
}
}
static void rds_ib_cq_comp_handler_send(struct ib_cq *cq, void *context)
{
struct rds_connection *conn = context;
struct rds_ib_connection *ic = conn->c_transport_data;
rdsdebug("conn %p cq %p\n", conn, cq);
rds_ib_stats_inc(s_ib_evt_handler_call);
tasklet_schedule(&ic->i_send_tasklet);
}
/*
* This needs to be very careful to not leave IS_ERR pointers around for
* cleanup to trip over.
......@@ -271,7 +373,8 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
ic->i_pd = rds_ibdev->pd;
cq_attr.cqe = ic->i_send_ring.w_nr + 1;
ic->i_send_cq = ib_create_cq(dev, rds_ib_send_cq_comp_handler,
ic->i_send_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_send,
rds_ib_cq_event_handler, conn,
&cq_attr);
if (IS_ERR(ic->i_send_cq)) {
......@@ -282,7 +385,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
}
cq_attr.cqe = ic->i_recv_ring.w_nr;
ic->i_recv_cq = ib_create_cq(dev, rds_ib_recv_cq_comp_handler,
ic->i_recv_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_recv,
rds_ib_cq_event_handler, conn,
&cq_attr);
if (IS_ERR(ic->i_recv_cq)) {
......@@ -637,6 +740,7 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
wait_event(rds_ib_ring_empty_wait,
rds_ib_ring_empty(&ic->i_recv_ring) &&
(atomic_read(&ic->i_signaled_sends) == 0));
tasklet_kill(&ic->i_send_tasklet);
tasklet_kill(&ic->i_recv_tasklet);
/* first destroy the ib state that generates callbacks */
......@@ -743,8 +847,10 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
}
INIT_LIST_HEAD(&ic->ib_node);
tasklet_init(&ic->i_recv_tasklet, rds_ib_recv_tasklet_fn,
(unsigned long) ic);
tasklet_init(&ic->i_send_tasklet, rds_ib_tasklet_fn_send,
(unsigned long)ic);
tasklet_init(&ic->i_recv_tasklet, rds_ib_tasklet_fn_recv,
(unsigned long)ic);
mutex_init(&ic->i_recv_mutex);
#ifndef KERNEL_HAS_ATOMIC64
spin_lock_init(&ic->i_ack_lock);
......
......@@ -65,6 +65,7 @@ struct rds_ib_mr {
* Our own little FMR pool
*/
struct rds_ib_mr_pool {
unsigned int pool_type;
struct mutex flush_lock; /* serialize fmr invalidate */
struct delayed_work flush_worker; /* flush worker */
......@@ -83,7 +84,7 @@ struct rds_ib_mr_pool {
struct ib_fmr_attr fmr_attr;
};
struct workqueue_struct *rds_ib_fmr_wq;
static struct workqueue_struct *rds_ib_fmr_wq;
int rds_ib_fmr_init(void)
{
......@@ -159,10 +160,8 @@ static void rds_ib_remove_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
}
spin_unlock_irq(&rds_ibdev->spinlock);
if (to_free) {
synchronize_rcu();
kfree(to_free);
}
if (to_free)
kfree_rcu(to_free, rcu);
}
int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
......@@ -236,7 +235,8 @@ void rds_ib_destroy_nodev_conns(void)
rds_conn_destroy(ic->conn);
}
struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev,
int pool_type)
{
struct rds_ib_mr_pool *pool;
......@@ -244,6 +244,7 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
if (!pool)
return ERR_PTR(-ENOMEM);
pool->pool_type = pool_type;
init_llist_head(&pool->free_list);
init_llist_head(&pool->drop_list);
init_llist_head(&pool->clean_list);
......@@ -251,28 +252,30 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
init_waitqueue_head(&pool->flush_wait);
INIT_DELAYED_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
pool->fmr_attr.max_pages = fmr_message_size;
if (pool_type == RDS_IB_MR_1M_POOL) {
/* +1 allows for unaligned MRs */
pool->fmr_attr.max_pages = RDS_FMR_1M_MSG_SIZE + 1;
pool->max_items = RDS_FMR_1M_POOL_SIZE;
} else {
/* pool_type == RDS_IB_MR_8K_POOL */
pool->fmr_attr.max_pages = RDS_FMR_8K_MSG_SIZE + 1;
pool->max_items = RDS_FMR_8K_POOL_SIZE;
}
pool->max_free_pinned = pool->max_items * pool->fmr_attr.max_pages / 4;
pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
pool->fmr_attr.page_shift = PAGE_SHIFT;
pool->max_free_pinned = rds_ibdev->max_fmrs * fmr_message_size / 4;
/* We never allow more than max_items MRs to be allocated.
* When we exceed more than max_items_soft, we start freeing
* items more aggressively.
* Make sure that max_items > max_items_soft > max_items / 2
*/
pool->max_items_soft = rds_ibdev->max_fmrs * 3 / 4;
pool->max_items = rds_ibdev->max_fmrs;
return pool;
}
void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_connection *iinfo)
{
struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
struct rds_ib_mr_pool *pool_1m = rds_ibdev->mr_1m_pool;
iinfo->rdma_mr_max = pool->max_items;
iinfo->rdma_mr_size = pool->fmr_attr.max_pages;
iinfo->rdma_mr_max = pool_1m->max_items;
iinfo->rdma_mr_size = pool_1m->fmr_attr.max_pages;
}
void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool)
......@@ -314,14 +317,28 @@ static inline void wait_clean_list_grace(void)
}
}
static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev,
int npages)
{
struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
struct rds_ib_mr_pool *pool;
struct rds_ib_mr *ibmr = NULL;
int err = 0, iter = 0;
if (npages <= RDS_FMR_8K_MSG_SIZE)
pool = rds_ibdev->mr_8k_pool;
else
pool = rds_ibdev->mr_1m_pool;
if (atomic_read(&pool->dirty_count) >= pool->max_items / 10)
schedule_delayed_work(&pool->flush_worker, 10);
queue_delayed_work(rds_ib_fmr_wq, &pool->flush_worker, 10);
/* Switch pools if one of the pool is reaching upper limit */
if (atomic_read(&pool->dirty_count) >= pool->max_items * 9 / 10) {
if (pool->pool_type == RDS_IB_MR_8K_POOL)
pool = rds_ibdev->mr_1m_pool;
else
pool = rds_ibdev->mr_8k_pool;
}
while (1) {
ibmr = rds_ib_reuse_fmr(pool);
......@@ -343,12 +360,18 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
atomic_dec(&pool->item_count);
if (++iter > 2) {
rds_ib_stats_inc(s_ib_rdma_mr_pool_depleted);
if (pool->pool_type == RDS_IB_MR_8K_POOL)
rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_depleted);
else
rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_depleted);
return ERR_PTR(-EAGAIN);
}
/* We do have some empty MRs. Flush them out. */
rds_ib_stats_inc(s_ib_rdma_mr_pool_wait);
if (pool->pool_type == RDS_IB_MR_8K_POOL)
rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_wait);
else
rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_wait);
rds_ib_flush_mr_pool(pool, 0, &ibmr);
if (ibmr)
return ibmr;
......@@ -373,7 +396,12 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
goto out_no_cigar;
}
rds_ib_stats_inc(s_ib_rdma_mr_alloc);
ibmr->pool = pool;
if (pool->pool_type == RDS_IB_MR_8K_POOL)
rds_ib_stats_inc(s_ib_rdma_mr_8k_alloc);
else
rds_ib_stats_inc(s_ib_rdma_mr_1m_alloc);
return ibmr;
out_no_cigar:
......@@ -429,7 +457,7 @@ static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibm
}
page_cnt += len >> PAGE_SHIFT;
if (page_cnt > fmr_message_size)
if (page_cnt > ibmr->pool->fmr_attr.max_pages)
return -EINVAL;
dma_pages = kmalloc_node(sizeof(u64) * page_cnt, GFP_ATOMIC,
......@@ -461,7 +489,10 @@ static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibm
ibmr->sg_dma_len = sg_dma_len;
ibmr->remap_count++;
rds_ib_stats_inc(s_ib_rdma_mr_used);
if (ibmr->pool->pool_type == RDS_IB_MR_8K_POOL)
rds_ib_stats_inc(s_ib_rdma_mr_8k_used);
else
rds_ib_stats_inc(s_ib_rdma_mr_1m_used);
ret = 0;
out:
......@@ -524,8 +555,7 @@ static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
__rds_ib_teardown_mr(ibmr);
if (pinned) {
struct rds_ib_device *rds_ibdev = ibmr->device;
struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
struct rds_ib_mr_pool *pool = ibmr->pool;
atomic_sub(pinned, &pool->free_pinned);
}
......@@ -594,7 +624,7 @@ static void list_to_llist_nodes(struct rds_ib_mr_pool *pool,
* to free as many MRs as needed to get back to this limit.
*/
static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
int free_all, struct rds_ib_mr **ibmr_ret)
int free_all, struct rds_ib_mr **ibmr_ret)
{
struct rds_ib_mr *ibmr, *next;
struct llist_node *clean_nodes;
......@@ -605,11 +635,14 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
unsigned int nfreed = 0, dirty_to_clean = 0, free_goal;
int ret = 0;
rds_ib_stats_inc(s_ib_rdma_mr_pool_flush);
if (pool->pool_type == RDS_IB_MR_8K_POOL)
rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_flush);
else
rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_flush);
if (ibmr_ret) {
DEFINE_WAIT(wait);
while(!mutex_trylock(&pool->flush_lock)) {
while (!mutex_trylock(&pool->flush_lock)) {
ibmr = rds_ib_reuse_fmr(pool);
if (ibmr) {
*ibmr_ret = ibmr;
......@@ -666,8 +699,12 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
list_for_each_entry_safe(ibmr, next, &unmap_list, unmap_list) {
unpinned += ibmr->sg_len;
__rds_ib_teardown_mr(ibmr);
if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) {
rds_ib_stats_inc(s_ib_rdma_mr_free);
if (nfreed < free_goal ||
ibmr->remap_count >= pool->fmr_attr.max_maps) {
if (ibmr->pool->pool_type == RDS_IB_MR_8K_POOL)
rds_ib_stats_inc(s_ib_rdma_mr_8k_free);
else
rds_ib_stats_inc(s_ib_rdma_mr_1m_free);
list_del(&ibmr->unmap_list);
ib_dealloc_fmr(ibmr->fmr);
kfree(ibmr);
......@@ -719,8 +756,8 @@ static void rds_ib_mr_pool_flush_worker(struct work_struct *work)
void rds_ib_free_mr(void *trans_private, int invalidate)
{
struct rds_ib_mr *ibmr = trans_private;
struct rds_ib_mr_pool *pool = ibmr->pool;
struct rds_ib_device *rds_ibdev = ibmr->device;
struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len);
......@@ -759,10 +796,11 @@ void rds_ib_flush_mrs(void)
down_read(&rds_ib_devices_lock);
list_for_each_entry(rds_ibdev, &rds_ib_devices, list) {
struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
if (rds_ibdev->mr_8k_pool)
rds_ib_flush_mr_pool(rds_ibdev->mr_8k_pool, 0, NULL);
if (pool)
rds_ib_flush_mr_pool(pool, 0, NULL);
if (rds_ibdev->mr_1m_pool)
rds_ib_flush_mr_pool(rds_ibdev->mr_1m_pool, 0, NULL);
}
up_read(&rds_ib_devices_lock);
}
......@@ -780,12 +818,12 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
goto out;
}
if (!rds_ibdev->mr_pool) {
if (!rds_ibdev->mr_8k_pool || !rds_ibdev->mr_1m_pool) {
ret = -ENODEV;
goto out;
}
ibmr = rds_ib_alloc_fmr(rds_ibdev);
ibmr = rds_ib_alloc_fmr(rds_ibdev, nents);
if (IS_ERR(ibmr)) {
rds_ib_dev_put(rds_ibdev);
return ibmr;
......
......@@ -596,8 +596,7 @@ void rds_ib_recv_init_ack(struct rds_ib_connection *ic)
* wr_id and avoids working with the ring in that case.
*/
#ifndef KERNEL_HAS_ATOMIC64
static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq,
int ack_required)
void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required)
{
unsigned long flags;
......@@ -622,8 +621,7 @@ static u64 rds_ib_get_ack(struct rds_ib_connection *ic)
return seq;
}
#else
static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq,
int ack_required)
void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required)
{
atomic64_set(&ic->i_ack_next, seq);
if (ack_required) {
......@@ -830,20 +828,6 @@ static void rds_ib_cong_recv(struct rds_connection *conn,
rds_cong_map_updated(map, uncongested);
}
/*
* Rings are posted with all the allocations they'll need to queue the
* incoming message to the receiving socket so this can't fail.
* All fragments start with a header, so we can make sure we're not receiving
* garbage, and we can tell a small 8 byte fragment from an ACK frame.
*/
struct rds_ib_ack_state {
u64 ack_next;
u64 ack_recv;
unsigned int ack_required:1;
unsigned int ack_next_valid:1;
unsigned int ack_recv_valid:1;
};
static void rds_ib_process_recv(struct rds_connection *conn,
struct rds_ib_recv_work *recv, u32 data_len,
struct rds_ib_ack_state *state)
......@@ -969,96 +953,50 @@ static void rds_ib_process_recv(struct rds_connection *conn,
}
}
/*
* Plucking the oldest entry from the ring can be done concurrently with
* the thread refilling the ring. Each ring operation is protected by
* spinlocks and the transient state of refilling doesn't change the
* recording of which entry is oldest.
*
* This relies on IB only calling one cq comp_handler for each cq so that
* there will only be one caller of rds_recv_incoming() per RDS connection.
*/
void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context)
{
struct rds_connection *conn = context;
struct rds_ib_connection *ic = conn->c_transport_data;
rdsdebug("conn %p cq %p\n", conn, cq);
rds_ib_stats_inc(s_ib_rx_cq_call);
tasklet_schedule(&ic->i_recv_tasklet);
}
static inline void rds_poll_cq(struct rds_ib_connection *ic,
struct rds_ib_ack_state *state)
void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic,
struct ib_wc *wc,
struct rds_ib_ack_state *state)
{
struct rds_connection *conn = ic->conn;
struct ib_wc wc;
struct rds_ib_recv_work *recv;
while (ib_poll_cq(ic->i_recv_cq, 1, &wc) > 0) {
rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
(unsigned long long)wc.wr_id, wc.status,
ib_wc_status_msg(wc.status), wc.byte_len,
be32_to_cpu(wc.ex.imm_data));
rds_ib_stats_inc(s_ib_rx_cq_event);
rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
(unsigned long long)wc->wr_id, wc->status,
ib_wc_status_msg(wc->status), wc->byte_len,
be32_to_cpu(wc->ex.imm_data));
recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)];
ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, DMA_FROM_DEVICE);
/*
* Also process recvs in connecting state because it is possible
* to get a recv completion _before_ the rdmacm ESTABLISHED
* event is processed.
*/
if (wc.status == IB_WC_SUCCESS) {
rds_ib_process_recv(conn, recv, wc.byte_len, state);
} else {
/* We expect errors as the qp is drained during shutdown */
if (rds_conn_up(conn) || rds_conn_connecting(conn))
rds_ib_conn_error(conn, "recv completion on %pI4 had "
"status %u (%s), disconnecting and "
"reconnecting\n", &conn->c_faddr,
wc.status,
ib_wc_status_msg(wc.status));
}
rds_ib_stats_inc(s_ib_rx_cq_event);
recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)];
ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1,
DMA_FROM_DEVICE);
/*
* rds_ib_process_recv() doesn't always consume the frag, and
* we might not have called it at all if the wc didn't indicate
* success. We already unmapped the frag's pages, though, and
* the following rds_ib_ring_free() call tells the refill path
* that it will not find an allocated frag here. Make sure we
* keep that promise by freeing a frag that's still on the ring.
*/
if (recv->r_frag) {
rds_ib_frag_free(ic, recv->r_frag);
recv->r_frag = NULL;
}
rds_ib_ring_free(&ic->i_recv_ring, 1);
/* Also process recvs in connecting state because it is possible
* to get a recv completion _before_ the rdmacm ESTABLISHED
* event is processed.
*/
if (wc->status == IB_WC_SUCCESS) {
rds_ib_process_recv(conn, recv, wc->byte_len, state);
} else {
/* We expect errors as the qp is drained during shutdown */
if (rds_conn_up(conn) || rds_conn_connecting(conn))
rds_ib_conn_error(conn, "recv completion on %pI4 had status %u (%s), disconnecting and reconnecting\n",
&conn->c_faddr,
wc->status,
ib_wc_status_msg(wc->status));
}
}
void rds_ib_recv_tasklet_fn(unsigned long data)
{
struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
struct rds_connection *conn = ic->conn;
struct rds_ib_ack_state state = { 0, };
rds_poll_cq(ic, &state);
ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
rds_poll_cq(ic, &state);
if (state.ack_next_valid)
rds_ib_set_ack(ic, state.ack_next, state.ack_required);
if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
rds_send_drop_acked(conn, state.ack_recv, NULL);
ic->i_ack_recv = state.ack_recv;
/* rds_ib_process_recv() doesn't always consume the frag, and
* we might not have called it at all if the wc didn't indicate
* success. We already unmapped the frag's pages, though, and
* the following rds_ib_ring_free() call tells the refill path
* that it will not find an allocated frag here. Make sure we
* keep that promise by freeing a frag that's still on the ring.
*/
if (recv->r_frag) {
rds_ib_frag_free(ic, recv->r_frag);
recv->r_frag = NULL;
}
if (rds_conn_up(conn))
rds_ib_attempt_ack(ic);
rds_ib_ring_free(&ic->i_recv_ring, 1);
/* If we ever end up with a really empty receive ring, we're
* in deep trouble, as the sender will definitely see RNR
......
......@@ -195,7 +195,7 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
send->s_op = NULL;
send->s_wr.wr_id = i;
send->s_wr.wr_id = i | RDS_IB_SEND_OP;
send->s_wr.sg_list = send->s_sge;
send->s_wr.ex.imm_data = 0;
......@@ -237,81 +237,73 @@ static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr)
* unallocs the next free entry in the ring it doesn't alter which is
* the next to be freed, which is what this is concerned with.
*/
void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc)
{
struct rds_connection *conn = context;
struct rds_ib_connection *ic = conn->c_transport_data;
struct rds_message *rm = NULL;
struct ib_wc wc;
struct rds_connection *conn = ic->conn;
struct rds_ib_send_work *send;
u32 completed;
u32 oldest;
u32 i = 0;
int ret;
int nr_sig = 0;
rdsdebug("cq %p conn %p\n", cq, conn);
rds_ib_stats_inc(s_ib_tx_cq_call);
ret = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
if (ret)
rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
while (ib_poll_cq(cq, 1, &wc) > 0) {
rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
(unsigned long long)wc.wr_id, wc.status,
ib_wc_status_msg(wc.status), wc.byte_len,
be32_to_cpu(wc.ex.imm_data));
rds_ib_stats_inc(s_ib_tx_cq_event);
if (wc.wr_id == RDS_IB_ACK_WR_ID) {
if (time_after(jiffies, ic->i_ack_queued + HZ/2))
rds_ib_stats_inc(s_ib_tx_stalled);
rds_ib_ack_send_complete(ic);
continue;
}
oldest = rds_ib_ring_oldest(&ic->i_send_ring);
rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
(unsigned long long)wc->wr_id, wc->status,
ib_wc_status_msg(wc->status), wc->byte_len,
be32_to_cpu(wc->ex.imm_data));
rds_ib_stats_inc(s_ib_tx_cq_event);
completed = rds_ib_ring_completed(&ic->i_send_ring, wc.wr_id, oldest);
if (wc->wr_id == RDS_IB_ACK_WR_ID) {
if (time_after(jiffies, ic->i_ack_queued + HZ / 2))
rds_ib_stats_inc(s_ib_tx_stalled);
rds_ib_ack_send_complete(ic);
return;
}
for (i = 0; i < completed; i++) {
send = &ic->i_sends[oldest];
if (send->s_wr.send_flags & IB_SEND_SIGNALED)
nr_sig++;
oldest = rds_ib_ring_oldest(&ic->i_send_ring);
rm = rds_ib_send_unmap_op(ic, send, wc.status);
completed = rds_ib_ring_completed(&ic->i_send_ring,
(wc->wr_id & ~RDS_IB_SEND_OP),
oldest);
if (time_after(jiffies, send->s_queued + HZ/2))
rds_ib_stats_inc(s_ib_tx_stalled);
for (i = 0; i < completed; i++) {
send = &ic->i_sends[oldest];
if (send->s_wr.send_flags & IB_SEND_SIGNALED)
nr_sig++;
if (send->s_op) {
if (send->s_op == rm->m_final_op) {
/* If anyone waited for this message to get flushed out, wake
* them up now */
rds_message_unmapped(rm);
}
rds_message_put(rm);
send->s_op = NULL;
}
rm = rds_ib_send_unmap_op(ic, send, wc->status);
oldest = (oldest + 1) % ic->i_send_ring.w_nr;
}
if (time_after(jiffies, send->s_queued + HZ / 2))
rds_ib_stats_inc(s_ib_tx_stalled);
rds_ib_ring_free(&ic->i_send_ring, completed);
rds_ib_sub_signaled(ic, nr_sig);
nr_sig = 0;
if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
test_bit(0, &conn->c_map_queued))
queue_delayed_work(rds_wq, &conn->c_send_w, 0);
/* We expect errors as the qp is drained during shutdown */
if (wc.status != IB_WC_SUCCESS && rds_conn_up(conn)) {
rds_ib_conn_error(conn, "send completion on %pI4 had status "
"%u (%s), disconnecting and reconnecting\n",
&conn->c_faddr, wc.status,
ib_wc_status_msg(wc.status));
if (send->s_op) {
if (send->s_op == rm->m_final_op) {
/* If anyone waited for this message to get
* flushed out, wake them up now
*/
rds_message_unmapped(rm);
}
rds_message_put(rm);
send->s_op = NULL;
}
oldest = (oldest + 1) % ic->i_send_ring.w_nr;
}
rds_ib_ring_free(&ic->i_send_ring, completed);
rds_ib_sub_signaled(ic, nr_sig);
nr_sig = 0;
if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
test_bit(0, &conn->c_map_queued))
queue_delayed_work(rds_wq, &conn->c_send_w, 0);
/* We expect errors as the qp is drained during shutdown */
if (wc->status != IB_WC_SUCCESS && rds_conn_up(conn)) {
rds_ib_conn_error(conn, "send completion on %pI4 had status %u (%s), disconnecting and reconnecting\n",
&conn->c_faddr, wc->status,
ib_wc_status_msg(wc->status));
}
}
......
......@@ -42,14 +42,14 @@ DEFINE_PER_CPU_SHARED_ALIGNED(struct rds_ib_statistics, rds_ib_stats);
static const char *const rds_ib_stat_names[] = {
"ib_connect_raced",
"ib_listen_closed_stale",
"ib_tx_cq_call",
"s_ib_evt_handler_call",
"ib_tasklet_call",
"ib_tx_cq_event",
"ib_tx_ring_full",
"ib_tx_throttle",
"ib_tx_sg_mapping_failure",
"ib_tx_stalled",
"ib_tx_credit_updates",
"ib_rx_cq_call",
"ib_rx_cq_event",
"ib_rx_ring_empty",
"ib_rx_refill_from_cq",
......@@ -61,12 +61,18 @@ static const char *const rds_ib_stat_names[] = {
"ib_ack_send_delayed",
"ib_ack_send_piggybacked",
"ib_ack_received",
"ib_rdma_mr_alloc",
"ib_rdma_mr_free",
"ib_rdma_mr_used",
"ib_rdma_mr_pool_flush",
"ib_rdma_mr_pool_wait",
"ib_rdma_mr_pool_depleted",
"ib_rdma_mr_8k_alloc",
"ib_rdma_mr_8k_free",
"ib_rdma_mr_8k_used",
"ib_rdma_mr_8k_pool_flush",
"ib_rdma_mr_8k_pool_wait",
"ib_rdma_mr_8k_pool_depleted",
"ib_rdma_mr_1m_alloc",
"ib_rdma_mr_1m_free",
"ib_rdma_mr_1m_used",
"ib_rdma_mr_1m_pool_flush",
"ib_rdma_mr_1m_pool_wait",
"ib_rdma_mr_1m_pool_depleted",
"ib_atomic_cswp",
"ib_atomic_fadd",
};
......
......@@ -605,6 +605,7 @@ extern wait_queue_head_t rds_poll_waitq;
int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len);
void rds_remove_bound(struct rds_sock *rs);
struct rds_sock *rds_find_bound(__be32 addr, __be16 port);
void rds_bind_lock_init(void);
/* cong.c */
int rds_cong_get_maps(struct rds_connection *conn);
......
......@@ -38,6 +38,7 @@
#include <linux/list.h>
#include <linux/ratelimit.h>
#include <linux/export.h>
#include <linux/sizes.h>
#include "rds.h"
......@@ -51,7 +52,7 @@
* it to 0 will restore the old behavior (where we looped until we had
* drained the queue).
*/
static int send_batch_count = 64;
static int send_batch_count = SZ_1K;
module_param(send_batch_count, int, 0444);
MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
......@@ -223,7 +224,7 @@ int rds_send_xmit(struct rds_connection *conn)
* through a lot of messages, lets back off and see
* if anyone else jumps in
*/
if (batch_count >= 1024)
if (batch_count >= send_batch_count)
goto over_batch;
spin_lock_irqsave(&conn->c_lock, flags);
......@@ -423,12 +424,15 @@ int rds_send_xmit(struct rds_connection *conn)
!list_empty(&conn->c_send_queue)) &&
send_gen == conn->c_send_gen) {
rds_stats_inc(s_send_lock_queue_raced);
goto restart;
if (batch_count < send_batch_count)
goto restart;
queue_delayed_work(rds_wq, &conn->c_send_w, 1);
}
}
out:
return ret;
}
EXPORT_SYMBOL_GPL(rds_send_xmit);
static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm)
{
......@@ -1120,8 +1124,9 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
*/
rds_stats_inc(s_send_queued);
if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags))
rds_send_xmit(conn);
ret = rds_send_xmit(conn);
if (ret == -ENOMEM || ret == -EAGAIN)
queue_delayed_work(rds_wq, &conn->c_send_w, 1);
rds_message_put(rm);
return payload_len;
......@@ -1177,8 +1182,9 @@ rds_send_pong(struct rds_connection *conn, __be16 dport)
rds_stats_inc(s_send_queued);
rds_stats_inc(s_send_pong);
if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags))
queue_delayed_work(rds_wq, &conn->c_send_w, 0);
ret = rds_send_xmit(conn);
if (ret == -ENOMEM || ret == -EAGAIN)
queue_delayed_work(rds_wq, &conn->c_send_w, 1);
rds_message_put(rm);
return 0;
......
......@@ -162,7 +162,9 @@ void rds_send_worker(struct work_struct *work)
int ret;
if (rds_conn_state(conn) == RDS_CONN_UP) {
clear_bit(RDS_LL_SEND_FULL, &conn->c_flags);
ret = rds_send_xmit(conn);
cond_resched();
rdsdebug("conn %p ret %d\n", conn, ret);
switch (ret) {
case -EAGAIN:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册