提交 cf0ac2b8 编写于 作者: D David S. Miller

Merge branch 'for-davem' of git://oss.oracle.com/git/agrover/linux-2.6

......@@ -302,6 +302,7 @@ header-y += quota.h
header-y += radeonfb.h
header-y += random.h
header-y += raw.h
header-y += rds.h
header-y += reboot.h
header-y += reiserfs_fs.h
header-y += reiserfs_xattr.h
......
......@@ -73,6 +73,10 @@
#define RDS_CMSG_RDMA_MAP 3
#define RDS_CMSG_RDMA_STATUS 4
#define RDS_CMSG_CONG_UPDATE 5
#define RDS_CMSG_ATOMIC_FADD 6
#define RDS_CMSG_ATOMIC_CSWP 7
#define RDS_CMSG_MASKED_ATOMIC_FADD 8
#define RDS_CMSG_MASKED_ATOMIC_CSWP 9
#define RDS_INFO_FIRST 10000
#define RDS_INFO_COUNTERS 10000
......@@ -89,9 +93,9 @@
#define RDS_INFO_LAST 10010
struct rds_info_counter {
u_int8_t name[32];
u_int64_t value;
} __packed;
uint8_t name[32];
uint64_t value;
} __attribute__((packed));
#define RDS_INFO_CONNECTION_FLAG_SENDING 0x01
#define RDS_INFO_CONNECTION_FLAG_CONNECTING 0x02
......@@ -100,56 +104,48 @@ struct rds_info_counter {
#define TRANSNAMSIZ 16
struct rds_info_connection {
u_int64_t next_tx_seq;
u_int64_t next_rx_seq;
uint64_t next_tx_seq;
uint64_t next_rx_seq;
__be32 laddr;
__be32 faddr;
u_int8_t transport[TRANSNAMSIZ]; /* null term ascii */
u_int8_t flags;
} __packed;
struct rds_info_flow {
__be32 laddr;
__be32 faddr;
u_int32_t bytes;
__be16 lport;
__be16 fport;
} __packed;
uint8_t transport[TRANSNAMSIZ]; /* null term ascii */
uint8_t flags;
} __attribute__((packed));
#define RDS_INFO_MESSAGE_FLAG_ACK 0x01
#define RDS_INFO_MESSAGE_FLAG_FAST_ACK 0x02
struct rds_info_message {
u_int64_t seq;
u_int32_t len;
uint64_t seq;
uint32_t len;
__be32 laddr;
__be32 faddr;
__be16 lport;
__be16 fport;
u_int8_t flags;
} __packed;
uint8_t flags;
} __attribute__((packed));
struct rds_info_socket {
u_int32_t sndbuf;
uint32_t sndbuf;
__be32 bound_addr;
__be32 connected_addr;
__be16 bound_port;
__be16 connected_port;
u_int32_t rcvbuf;
u_int64_t inum;
} __packed;
uint32_t rcvbuf;
uint64_t inum;
} __attribute__((packed));
struct rds_info_tcp_socket {
__be32 local_addr;
__be16 local_port;
__be32 peer_addr;
__be16 peer_port;
u_int64_t hdr_rem;
u_int64_t data_rem;
u_int32_t last_sent_nxt;
u_int32_t last_expected_una;
u_int32_t last_seen_una;
} __packed;
uint64_t hdr_rem;
uint64_t data_rem;
uint32_t last_sent_nxt;
uint32_t last_expected_una;
uint32_t last_seen_una;
} __attribute__((packed));
#define RDS_IB_GID_LEN 16
struct rds_info_rdma_connection {
......@@ -203,42 +199,69 @@ struct rds_info_rdma_connection {
* (so that the application does not have to worry about
* alignment).
*/
typedef u_int64_t rds_rdma_cookie_t;
typedef uint64_t rds_rdma_cookie_t;
struct rds_iovec {
u_int64_t addr;
u_int64_t bytes;
uint64_t addr;
uint64_t bytes;
};
struct rds_get_mr_args {
struct rds_iovec vec;
u_int64_t cookie_addr;
uint64_t cookie_addr;
uint64_t flags;
};
struct rds_get_mr_for_dest_args {
struct sockaddr_storage dest_addr;
struct rds_iovec vec;
u_int64_t cookie_addr;
uint64_t cookie_addr;
uint64_t flags;
};
struct rds_free_mr_args {
rds_rdma_cookie_t cookie;
u_int64_t flags;
uint64_t flags;
};
struct rds_rdma_args {
rds_rdma_cookie_t cookie;
struct rds_iovec remote_vec;
u_int64_t local_vec_addr;
u_int64_t nr_local;
u_int64_t flags;
u_int64_t user_token;
uint64_t local_vec_addr;
uint64_t nr_local;
uint64_t flags;
uint64_t user_token;
};
struct rds_atomic_args {
rds_rdma_cookie_t cookie;
uint64_t local_addr;
uint64_t remote_addr;
union {
struct {
uint64_t compare;
uint64_t swap;
} cswp;
struct {
uint64_t add;
} fadd;
struct {
uint64_t compare;
uint64_t swap;
uint64_t compare_mask;
uint64_t swap_mask;
} m_cswp;
struct {
uint64_t add;
uint64_t nocarry_mask;
} m_fadd;
};
uint64_t flags;
uint64_t user_token;
};
struct rds_rdma_notify {
u_int64_t user_token;
uint64_t user_token;
int32_t status;
};
......@@ -257,5 +280,6 @@ struct rds_rdma_notify {
#define RDS_RDMA_USE_ONCE 0x0008 /* free MR after use */
#define RDS_RDMA_DONTWAIT 0x0010 /* Don't wait in SET_BARRIER */
#define RDS_RDMA_NOTIFY_ME 0x0020 /* Notify when operation completes */
#define RDS_RDMA_SILENT 0x0040 /* Do not interrupt remote */
#endif /* IB_RDS_H */
......@@ -39,7 +39,15 @@
#include <net/sock.h>
#include "rds.h"
#include "rdma.h"
char *rds_str_array(char **array, size_t elements, size_t index)
{
if ((index < elements) && array[index])
return array[index];
else
return "unknown";
}
EXPORT_SYMBOL(rds_str_array);
/* this is just used for stats gathering :/ */
static DEFINE_SPINLOCK(rds_sock_lock);
......@@ -62,7 +70,7 @@ static int rds_release(struct socket *sock)
struct rds_sock *rs;
unsigned long flags;
if (sk == NULL)
if (!sk)
goto out;
rs = rds_sk_to_rs(sk);
......@@ -73,7 +81,15 @@ static int rds_release(struct socket *sock)
* with the socket. */
rds_clear_recv_queue(rs);
rds_cong_remove_socket(rs);
/*
* the binding lookup hash uses rcu, we need to
* make sure we sychronize_rcu before we free our
* entry
*/
rds_remove_bound(rs);
synchronize_rcu();
rds_send_drop_to(rs, NULL);
rds_rdma_drop_keys(rs);
rds_notify_queue_get(rs, NULL);
......@@ -83,6 +99,8 @@ static int rds_release(struct socket *sock)
rds_sock_count--;
spin_unlock_irqrestore(&rds_sock_lock, flags);
rds_trans_put(rs->rs_transport);
sock->sk = NULL;
sock_put(sk);
out:
......@@ -514,7 +532,7 @@ static void rds_sock_info(struct socket *sock, unsigned int len,
spin_unlock_irqrestore(&rds_sock_lock, flags);
}
static void __exit rds_exit(void)
static void rds_exit(void)
{
sock_unregister(rds_family_ops.family);
proto_unregister(&rds_proto);
......@@ -529,7 +547,7 @@ static void __exit rds_exit(void)
}
module_exit(rds_exit);
static int __init rds_init(void)
static int rds_init(void)
{
int ret;
......
......@@ -34,45 +34,52 @@
#include <net/sock.h>
#include <linux/in.h>
#include <linux/if_arp.h>
#include <linux/jhash.h>
#include "rds.h"
/*
* XXX this probably still needs more work.. no INADDR_ANY, and rbtrees aren't
* particularly zippy.
*
* This is now called for every incoming frame so we arguably care much more
* about it than we used to.
*/
#define BIND_HASH_SIZE 1024
static struct hlist_head bind_hash_table[BIND_HASH_SIZE];
static DEFINE_SPINLOCK(rds_bind_lock);
static struct rb_root rds_bind_tree = RB_ROOT;
static struct rds_sock *rds_bind_tree_walk(__be32 addr, __be16 port,
struct rds_sock *insert)
static struct hlist_head *hash_to_bucket(__be32 addr, __be16 port)
{
return bind_hash_table + (jhash_2words((u32)addr, (u32)port, 0) &
(BIND_HASH_SIZE - 1));
}
static struct rds_sock *rds_bind_lookup(__be32 addr, __be16 port,
struct rds_sock *insert)
{
struct rb_node **p = &rds_bind_tree.rb_node;
struct rb_node *parent = NULL;
struct rds_sock *rs;
struct hlist_node *node;
struct hlist_head *head = hash_to_bucket(addr, port);
u64 cmp;
u64 needle = ((u64)be32_to_cpu(addr) << 32) | be16_to_cpu(port);
while (*p) {
parent = *p;
rs = rb_entry(parent, struct rds_sock, rs_bound_node);
rcu_read_lock();
hlist_for_each_entry_rcu(rs, node, head, rs_bound_node) {
cmp = ((u64)be32_to_cpu(rs->rs_bound_addr) << 32) |
be16_to_cpu(rs->rs_bound_port);
if (needle < cmp)
p = &(*p)->rb_left;
else if (needle > cmp)
p = &(*p)->rb_right;
else
if (cmp == needle) {
rcu_read_unlock();
return rs;
}
}
rcu_read_unlock();
if (insert) {
rb_link_node(&insert->rs_bound_node, parent, p);
rb_insert_color(&insert->rs_bound_node, &rds_bind_tree);
/*
* make sure our addr and port are set before
* we are added to the list, other people
* in rcu will find us as soon as the
* hlist_add_head_rcu is done
*/
insert->rs_bound_addr = addr;
insert->rs_bound_port = port;
rds_sock_addref(insert);
hlist_add_head_rcu(&insert->rs_bound_node, head);
}
return NULL;
}
......@@ -86,15 +93,13 @@ static struct rds_sock *rds_bind_tree_walk(__be32 addr, __be16 port,
struct rds_sock *rds_find_bound(__be32 addr, __be16 port)
{
struct rds_sock *rs;
unsigned long flags;
spin_lock_irqsave(&rds_bind_lock, flags);
rs = rds_bind_tree_walk(addr, port, NULL);
rs = rds_bind_lookup(addr, port, NULL);
if (rs && !sock_flag(rds_rs_to_sk(rs), SOCK_DEAD))
rds_sock_addref(rs);
else
rs = NULL;
spin_unlock_irqrestore(&rds_bind_lock, flags);
rdsdebug("returning rs %p for %pI4:%u\n", rs, &addr,
ntohs(port));
......@@ -121,22 +126,15 @@ static int rds_add_bound(struct rds_sock *rs, __be32 addr, __be16 *port)
do {
if (rover == 0)
rover++;
if (rds_bind_tree_walk(addr, cpu_to_be16(rover), rs) == NULL) {
*port = cpu_to_be16(rover);
if (!rds_bind_lookup(addr, cpu_to_be16(rover), rs)) {
*port = rs->rs_bound_port;
ret = 0;
rdsdebug("rs %p binding to %pI4:%d\n",
rs, &addr, (int)ntohs(*port));
break;
}
} while (rover++ != last);
if (ret == 0) {
rs->rs_bound_addr = addr;
rs->rs_bound_port = *port;
rds_sock_addref(rs);
rdsdebug("rs %p binding to %pI4:%d\n",
rs, &addr, (int)ntohs(*port));
}
spin_unlock_irqrestore(&rds_bind_lock, flags);
return ret;
......@@ -153,7 +151,7 @@ void rds_remove_bound(struct rds_sock *rs)
rs, &rs->rs_bound_addr,
ntohs(rs->rs_bound_port));
rb_erase(&rs->rs_bound_node, &rds_bind_tree);
hlist_del_init_rcu(&rs->rs_bound_node);
rds_sock_put(rs);
rs->rs_bound_addr = 0;
}
......@@ -184,7 +182,7 @@ int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len)
goto out;
trans = rds_trans_get_preferred(sin->sin_addr.s_addr);
if (trans == NULL) {
if (!trans) {
ret = -EADDRNOTAVAIL;
rds_remove_bound(rs);
if (printk_ratelimit())
......@@ -198,5 +196,9 @@ int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len)
out:
release_sock(sk);
/* we might have called rds_remove_bound on error */
if (ret)
synchronize_rcu();
return ret;
}
......@@ -141,7 +141,7 @@ static struct rds_cong_map *rds_cong_from_addr(__be32 addr)
unsigned long flags;
map = kzalloc(sizeof(struct rds_cong_map), GFP_KERNEL);
if (map == NULL)
if (!map)
return NULL;
map->m_addr = addr;
......@@ -159,7 +159,7 @@ static struct rds_cong_map *rds_cong_from_addr(__be32 addr)
ret = rds_cong_tree_walk(addr, map);
spin_unlock_irqrestore(&rds_cong_lock, flags);
if (ret == NULL) {
if (!ret) {
ret = map;
map = NULL;
}
......@@ -205,7 +205,7 @@ int rds_cong_get_maps(struct rds_connection *conn)
conn->c_lcong = rds_cong_from_addr(conn->c_laddr);
conn->c_fcong = rds_cong_from_addr(conn->c_faddr);
if (conn->c_lcong == NULL || conn->c_fcong == NULL)
if (!(conn->c_lcong && conn->c_fcong))
return -ENOMEM;
return 0;
......@@ -221,7 +221,7 @@ void rds_cong_queue_updates(struct rds_cong_map *map)
list_for_each_entry(conn, &map->m_conn_list, c_map_item) {
if (!test_and_set_bit(0, &conn->c_map_queued)) {
rds_stats_inc(s_cong_update_queued);
queue_delayed_work(rds_wq, &conn->c_send_w, 0);
rds_send_xmit(conn);
}
}
......
......@@ -37,7 +37,6 @@
#include "rds.h"
#include "loop.h"
#include "rdma.h"
#define RDS_CONNECTION_HASH_BITS 12
#define RDS_CONNECTION_HASH_ENTRIES (1 << RDS_CONNECTION_HASH_BITS)
......@@ -63,18 +62,7 @@ static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr)
var |= RDS_INFO_CONNECTION_FLAG_##suffix; \
} while (0)
static inline int rds_conn_is_sending(struct rds_connection *conn)
{
int ret = 0;
if (!mutex_trylock(&conn->c_send_lock))
ret = 1;
else
mutex_unlock(&conn->c_send_lock);
return ret;
}
/* rcu read lock must be held or the connection spinlock */
static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
__be32 laddr, __be32 faddr,
struct rds_transport *trans)
......@@ -82,7 +70,7 @@ static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
struct rds_connection *conn, *ret = NULL;
struct hlist_node *pos;
hlist_for_each_entry(conn, pos, head, c_hash_node) {
hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
if (conn->c_faddr == faddr && conn->c_laddr == laddr &&
conn->c_trans == trans) {
ret = conn;
......@@ -129,10 +117,11 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
{
struct rds_connection *conn, *parent = NULL;
struct hlist_head *head = rds_conn_bucket(laddr, faddr);
struct rds_transport *loop_trans;
unsigned long flags;
int ret;
spin_lock_irqsave(&rds_conn_lock, flags);
rcu_read_lock();
conn = rds_conn_lookup(head, laddr, faddr, trans);
if (conn && conn->c_loopback && conn->c_trans != &rds_loop_transport &&
!is_outgoing) {
......@@ -143,12 +132,12 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
parent = conn;
conn = parent->c_passive;
}
spin_unlock_irqrestore(&rds_conn_lock, flags);
rcu_read_unlock();
if (conn)
goto out;
conn = kmem_cache_zalloc(rds_conn_slab, gfp);
if (conn == NULL) {
if (!conn) {
conn = ERR_PTR(-ENOMEM);
goto out;
}
......@@ -159,7 +148,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
spin_lock_init(&conn->c_lock);
conn->c_next_tx_seq = 1;
mutex_init(&conn->c_send_lock);
init_waitqueue_head(&conn->c_waitq);
INIT_LIST_HEAD(&conn->c_send_queue);
INIT_LIST_HEAD(&conn->c_retrans);
......@@ -175,7 +164,9 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
* can bind to the destination address then we'd rather the messages
* flow through loopback rather than either transport.
*/
if (rds_trans_get_preferred(faddr)) {
loop_trans = rds_trans_get_preferred(faddr);
if (loop_trans) {
rds_trans_put(loop_trans);
conn->c_loopback = 1;
if (is_outgoing && trans->t_prefer_loopback) {
/* "outgoing" connection - and the transport
......@@ -238,7 +229,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
kmem_cache_free(rds_conn_slab, conn);
conn = found;
} else {
hlist_add_head(&conn->c_hash_node, head);
hlist_add_head_rcu(&conn->c_hash_node, head);
rds_cong_add_conn(conn);
rds_conn_count++;
}
......@@ -263,21 +254,91 @@ struct rds_connection *rds_conn_create_outgoing(__be32 laddr, __be32 faddr,
}
EXPORT_SYMBOL_GPL(rds_conn_create_outgoing);
void rds_conn_shutdown(struct rds_connection *conn)
{
/* shut it down unless it's down already */
if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_DOWN)) {
/*
* Quiesce the connection mgmt handlers before we start tearing
* things down. We don't hold the mutex for the entire
* duration of the shutdown operation, else we may be
* deadlocking with the CM handler. Instead, the CM event
* handler is supposed to check for state DISCONNECTING
*/
mutex_lock(&conn->c_cm_lock);
if (!rds_conn_transition(conn, RDS_CONN_UP, RDS_CONN_DISCONNECTING)
&& !rds_conn_transition(conn, RDS_CONN_ERROR, RDS_CONN_DISCONNECTING)) {
rds_conn_error(conn, "shutdown called in state %d\n",
atomic_read(&conn->c_state));
mutex_unlock(&conn->c_cm_lock);
return;
}
mutex_unlock(&conn->c_cm_lock);
wait_event(conn->c_waitq,
!test_bit(RDS_IN_XMIT, &conn->c_flags));
conn->c_trans->conn_shutdown(conn);
rds_conn_reset(conn);
if (!rds_conn_transition(conn, RDS_CONN_DISCONNECTING, RDS_CONN_DOWN)) {
/* This can happen - eg when we're in the middle of tearing
* down the connection, and someone unloads the rds module.
* Quite reproduceable with loopback connections.
* Mostly harmless.
*/
rds_conn_error(conn,
"%s: failed to transition to state DOWN, "
"current state is %d\n",
__func__,
atomic_read(&conn->c_state));
return;
}
}
/* Then reconnect if it's still live.
* The passive side of an IB loopback connection is never added
* to the conn hash, so we never trigger a reconnect on this
* conn - the reconnect is always triggered by the active peer. */
cancel_delayed_work_sync(&conn->c_conn_w);
rcu_read_lock();
if (!hlist_unhashed(&conn->c_hash_node)) {
rcu_read_unlock();
rds_queue_reconnect(conn);
} else {
rcu_read_unlock();
}
}
/*
* Stop and free a connection.
*
* This can only be used in very limited circumstances. It assumes that once
* the conn has been shutdown that no one else is referencing the connection.
* We can only ensure this in the rmmod path in the current code.
*/
void rds_conn_destroy(struct rds_connection *conn)
{
struct rds_message *rm, *rtmp;
unsigned long flags;
rdsdebug("freeing conn %p for %pI4 -> "
"%pI4\n", conn, &conn->c_laddr,
&conn->c_faddr);
hlist_del_init(&conn->c_hash_node);
/* Ensure conn will not be scheduled for reconnect */
spin_lock_irq(&rds_conn_lock);
hlist_del_init_rcu(&conn->c_hash_node);
spin_unlock_irq(&rds_conn_lock);
synchronize_rcu();
/* wait for the rds thread to shut it down */
atomic_set(&conn->c_state, RDS_CONN_ERROR);
cancel_delayed_work(&conn->c_conn_w);
queue_work(rds_wq, &conn->c_down_w);
flush_workqueue(rds_wq);
/* shut the connection down */
rds_conn_drop(conn);
flush_work(&conn->c_down_w);
/* make sure lingering queued work won't try to ref the conn */
cancel_delayed_work_sync(&conn->c_send_w);
cancel_delayed_work_sync(&conn->c_recv_w);
/* tear down queued messages */
list_for_each_entry_safe(rm, rtmp,
......@@ -302,7 +363,9 @@ void rds_conn_destroy(struct rds_connection *conn)
BUG_ON(!list_empty(&conn->c_retrans));
kmem_cache_free(rds_conn_slab, conn);
spin_lock_irqsave(&rds_conn_lock, flags);
rds_conn_count--;
spin_unlock_irqrestore(&rds_conn_lock, flags);
}
EXPORT_SYMBOL_GPL(rds_conn_destroy);
......@@ -316,23 +379,23 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
struct list_head *list;
struct rds_connection *conn;
struct rds_message *rm;
unsigned long flags;
unsigned int total = 0;
unsigned long flags;
size_t i;
len /= sizeof(struct rds_info_message);
spin_lock_irqsave(&rds_conn_lock, flags);
rcu_read_lock();
for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
i++, head++) {
hlist_for_each_entry(conn, pos, head, c_hash_node) {
hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
if (want_send)
list = &conn->c_send_queue;
else
list = &conn->c_retrans;
spin_lock(&conn->c_lock);
spin_lock_irqsave(&conn->c_lock, flags);
/* XXX too lazy to maintain counts.. */
list_for_each_entry(rm, list, m_conn_item) {
......@@ -343,11 +406,10 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
conn->c_faddr, 0);
}
spin_unlock(&conn->c_lock);
spin_unlock_irqrestore(&conn->c_lock, flags);
}
}
spin_unlock_irqrestore(&rds_conn_lock, flags);
rcu_read_unlock();
lens->nr = total;
lens->each = sizeof(struct rds_info_message);
......@@ -377,19 +439,17 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
uint64_t buffer[(item_len + 7) / 8];
struct hlist_head *head;
struct hlist_node *pos;
struct hlist_node *tmp;
struct rds_connection *conn;
unsigned long flags;
size_t i;
spin_lock_irqsave(&rds_conn_lock, flags);
rcu_read_lock();
lens->nr = 0;
lens->each = item_len;
for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
i++, head++) {
hlist_for_each_entry_safe(conn, pos, tmp, head, c_hash_node) {
hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
/* XXX no c_lock usage.. */
if (!visitor(conn, buffer))
......@@ -405,8 +465,7 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
lens->nr++;
}
}
spin_unlock_irqrestore(&rds_conn_lock, flags);
rcu_read_unlock();
}
EXPORT_SYMBOL_GPL(rds_for_each_conn_info);
......@@ -423,8 +482,8 @@ static int rds_conn_info_visitor(struct rds_connection *conn,
sizeof(cinfo->transport));
cinfo->flags = 0;
rds_conn_info_set(cinfo->flags,
rds_conn_is_sending(conn), SENDING);
rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags),
SENDING);
/* XXX Future: return the state rather than these funky bits */
rds_conn_info_set(cinfo->flags,
atomic_read(&conn->c_state) == RDS_CONN_CONNECTING,
......@@ -444,12 +503,12 @@ static void rds_conn_info(struct socket *sock, unsigned int len,
sizeof(struct rds_info_connection));
}
int __init rds_conn_init(void)
int rds_conn_init(void)
{
rds_conn_slab = kmem_cache_create("rds_connection",
sizeof(struct rds_connection),
0, 0, NULL);
if (rds_conn_slab == NULL)
if (!rds_conn_slab)
return -ENOMEM;
rds_info_register_func(RDS_INFO_CONNECTIONS, rds_conn_info);
......@@ -486,6 +545,18 @@ void rds_conn_drop(struct rds_connection *conn)
}
EXPORT_SYMBOL_GPL(rds_conn_drop);
/*
* If the connection is down, trigger a connect. We may have scheduled a
* delayed reconnect however - in this case we should not interfere.
*/
void rds_conn_connect_if_down(struct rds_connection *conn)
{
if (rds_conn_state(conn) == RDS_CONN_DOWN &&
!test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags))
queue_delayed_work(rds_wq, &conn->c_conn_w, 0);
}
EXPORT_SYMBOL_GPL(rds_conn_connect_if_down);
/*
* An error occurred on the connection
*/
......
......@@ -53,12 +53,71 @@ MODULE_PARM_DESC(fmr_message_size, " Max size of a RDMA transfer");
module_param(rds_ib_retry_count, int, 0444);
MODULE_PARM_DESC(rds_ib_retry_count, " Number of hw retries before reporting an error");
/*
* we have a clumsy combination of RCU and a rwsem protecting this list
* because it is used both in the get_mr fast path and while blocking in
* the FMR flushing path.
*/
DECLARE_RWSEM(rds_ib_devices_lock);
struct list_head rds_ib_devices;
/* NOTE: if also grabbing ibdev lock, grab this first */
DEFINE_SPINLOCK(ib_nodev_conns_lock);
LIST_HEAD(ib_nodev_conns);
void rds_ib_nodev_connect(void)
{
struct rds_ib_connection *ic;
spin_lock(&ib_nodev_conns_lock);
list_for_each_entry(ic, &ib_nodev_conns, ib_node)
rds_conn_connect_if_down(ic->conn);
spin_unlock(&ib_nodev_conns_lock);
}
void rds_ib_dev_shutdown(struct rds_ib_device *rds_ibdev)
{
struct rds_ib_connection *ic;
unsigned long flags;
spin_lock_irqsave(&rds_ibdev->spinlock, flags);
list_for_each_entry(ic, &rds_ibdev->conn_list, ib_node)
rds_conn_drop(ic->conn);
spin_unlock_irqrestore(&rds_ibdev->spinlock, flags);
}
/*
* rds_ib_destroy_mr_pool() blocks on a few things and mrs drop references
* from interrupt context so we push freing off into a work struct in krdsd.
*/
static void rds_ib_dev_free(struct work_struct *work)
{
struct rds_ib_ipaddr *i_ipaddr, *i_next;
struct rds_ib_device *rds_ibdev = container_of(work,
struct rds_ib_device, free_work);
if (rds_ibdev->mr_pool)
rds_ib_destroy_mr_pool(rds_ibdev->mr_pool);
if (rds_ibdev->mr)
ib_dereg_mr(rds_ibdev->mr);
if (rds_ibdev->pd)
ib_dealloc_pd(rds_ibdev->pd);
list_for_each_entry_safe(i_ipaddr, i_next, &rds_ibdev->ipaddr_list, list) {
list_del(&i_ipaddr->list);
kfree(i_ipaddr);
}
kfree(rds_ibdev);
}
void rds_ib_dev_put(struct rds_ib_device *rds_ibdev)
{
BUG_ON(atomic_read(&rds_ibdev->refcount) <= 0);
if (atomic_dec_and_test(&rds_ibdev->refcount))
queue_work(rds_wq, &rds_ibdev->free_work);
}
void rds_ib_add_one(struct ib_device *device)
{
struct rds_ib_device *rds_ibdev;
......@@ -77,11 +136,14 @@ void rds_ib_add_one(struct ib_device *device)
goto free_attr;
}
rds_ibdev = kmalloc(sizeof *rds_ibdev, GFP_KERNEL);
rds_ibdev = kzalloc_node(sizeof(struct rds_ib_device), GFP_KERNEL,
ibdev_to_node(device));
if (!rds_ibdev)
goto free_attr;
spin_lock_init(&rds_ibdev->spinlock);
atomic_set(&rds_ibdev->refcount, 1);
INIT_WORK(&rds_ibdev->free_work, rds_ib_dev_free);
rds_ibdev->max_wrs = dev_attr->max_qp_wr;
rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE);
......@@ -91,68 +153,107 @@ void rds_ib_add_one(struct ib_device *device)
min_t(unsigned int, dev_attr->max_fmr, fmr_pool_size) :
fmr_pool_size;
rds_ibdev->max_initiator_depth = dev_attr->max_qp_init_rd_atom;
rds_ibdev->max_responder_resources = dev_attr->max_qp_rd_atom;
rds_ibdev->dev = device;
rds_ibdev->pd = ib_alloc_pd(device);
if (IS_ERR(rds_ibdev->pd))
goto free_dev;
if (IS_ERR(rds_ibdev->pd)) {
rds_ibdev->pd = NULL;
goto put_dev;
}
rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
IB_ACCESS_LOCAL_WRITE);
if (IS_ERR(rds_ibdev->mr))
goto err_pd;
rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd, IB_ACCESS_LOCAL_WRITE);
if (IS_ERR(rds_ibdev->mr)) {
rds_ibdev->mr = NULL;
goto put_dev;
}
rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
if (IS_ERR(rds_ibdev->mr_pool)) {
rds_ibdev->mr_pool = NULL;
goto err_mr;
goto put_dev;
}
INIT_LIST_HEAD(&rds_ibdev->ipaddr_list);
INIT_LIST_HEAD(&rds_ibdev->conn_list);
list_add_tail(&rds_ibdev->list, &rds_ib_devices);
down_write(&rds_ib_devices_lock);
list_add_tail_rcu(&rds_ibdev->list, &rds_ib_devices);
up_write(&rds_ib_devices_lock);
atomic_inc(&rds_ibdev->refcount);
ib_set_client_data(device, &rds_ib_client, rds_ibdev);
atomic_inc(&rds_ibdev->refcount);
goto free_attr;
rds_ib_nodev_connect();
err_mr:
ib_dereg_mr(rds_ibdev->mr);
err_pd:
ib_dealloc_pd(rds_ibdev->pd);
free_dev:
kfree(rds_ibdev);
put_dev:
rds_ib_dev_put(rds_ibdev);
free_attr:
kfree(dev_attr);
}
/*
* New connections use this to find the device to associate with the
* connection. It's not in the fast path so we're not concerned about the
* performance of the IB call. (As of this writing, it uses an interrupt
* blocking spinlock to serialize walking a per-device list of all registered
* clients.)
*
* RCU is used to handle incoming connections racing with device teardown.
* Rather than use a lock to serialize removal from the client_data and
* getting a new reference, we use an RCU grace period. The destruction
* path removes the device from client_data and then waits for all RCU
* readers to finish.
*
* A new connection can get NULL from this if its arriving on a
* device that is in the process of being removed.
*/
struct rds_ib_device *rds_ib_get_client_data(struct ib_device *device)
{
struct rds_ib_device *rds_ibdev;
rcu_read_lock();
rds_ibdev = ib_get_client_data(device, &rds_ib_client);
if (rds_ibdev)
atomic_inc(&rds_ibdev->refcount);
rcu_read_unlock();
return rds_ibdev;
}
/*
* The IB stack is letting us know that a device is going away. This can
* happen if the underlying HCA driver is removed or if PCI hotplug is removing
* the pci function, for example.
*
* This can be called at any time and can be racing with any other RDS path.
*/
void rds_ib_remove_one(struct ib_device *device)
{
struct rds_ib_device *rds_ibdev;
struct rds_ib_ipaddr *i_ipaddr, *i_next;
rds_ibdev = ib_get_client_data(device, &rds_ib_client);
if (!rds_ibdev)
return;
list_for_each_entry_safe(i_ipaddr, i_next, &rds_ibdev->ipaddr_list, list) {
list_del(&i_ipaddr->list);
kfree(i_ipaddr);
}
rds_ib_dev_shutdown(rds_ibdev);
rds_ib_destroy_conns(rds_ibdev);
/* stop connection attempts from getting a reference to this device. */
ib_set_client_data(device, &rds_ib_client, NULL);
if (rds_ibdev->mr_pool)
rds_ib_destroy_mr_pool(rds_ibdev->mr_pool);
ib_dereg_mr(rds_ibdev->mr);
while (ib_dealloc_pd(rds_ibdev->pd)) {
rdsdebug("Failed to dealloc pd %p\n", rds_ibdev->pd);
msleep(1);
}
down_write(&rds_ib_devices_lock);
list_del_rcu(&rds_ibdev->list);
up_write(&rds_ib_devices_lock);
list_del(&rds_ibdev->list);
kfree(rds_ibdev);
/*
* This synchronize rcu is waiting for readers of both the ib
* client data and the devices list to finish before we drop
* both of those references.
*/
synchronize_rcu();
rds_ib_dev_put(rds_ibdev);
rds_ib_dev_put(rds_ibdev);
}
struct ib_client rds_ib_client = {
......@@ -186,7 +287,7 @@ static int rds_ib_conn_info_visitor(struct rds_connection *conn,
rdma_addr_get_sgid(dev_addr, (union ib_gid *) &iinfo->src_gid);
rdma_addr_get_dgid(dev_addr, (union ib_gid *) &iinfo->dst_gid);
rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
rds_ibdev = ic->rds_ibdev;
iinfo->max_send_wr = ic->i_send_ring.w_nr;
iinfo->max_recv_wr = ic->i_recv_ring.w_nr;
iinfo->max_send_sge = rds_ibdev->max_sge;
......@@ -248,29 +349,36 @@ static int rds_ib_laddr_check(__be32 addr)
return ret;
}
static void rds_ib_unregister_client(void)
{
ib_unregister_client(&rds_ib_client);
/* wait for rds_ib_dev_free() to complete */
flush_workqueue(rds_wq);
}
void rds_ib_exit(void)
{
rds_info_deregister_func(RDS_INFO_IB_CONNECTIONS, rds_ib_ic_info);
rds_ib_unregister_client();
rds_ib_destroy_nodev_conns();
ib_unregister_client(&rds_ib_client);
rds_ib_sysctl_exit();
rds_ib_recv_exit();
rds_trans_unregister(&rds_ib_transport);
rds_ib_fmr_exit();
}
struct rds_transport rds_ib_transport = {
.laddr_check = rds_ib_laddr_check,
.xmit_complete = rds_ib_xmit_complete,
.xmit = rds_ib_xmit,
.xmit_cong_map = NULL,
.xmit_rdma = rds_ib_xmit_rdma,
.xmit_atomic = rds_ib_xmit_atomic,
.recv = rds_ib_recv,
.conn_alloc = rds_ib_conn_alloc,
.conn_free = rds_ib_conn_free,
.conn_connect = rds_ib_conn_connect,
.conn_shutdown = rds_ib_conn_shutdown,
.inc_copy_to_user = rds_ib_inc_copy_to_user,
.inc_purge = rds_ib_inc_purge,
.inc_free = rds_ib_inc_free,
.cm_initiate_connect = rds_ib_cm_initiate_connect,
.cm_handle_connect = rds_ib_cm_handle_connect,
......@@ -286,16 +394,20 @@ struct rds_transport rds_ib_transport = {
.t_type = RDS_TRANS_IB
};
int __init rds_ib_init(void)
int rds_ib_init(void)
{
int ret;
INIT_LIST_HEAD(&rds_ib_devices);
ret = ib_register_client(&rds_ib_client);
ret = rds_ib_fmr_init();
if (ret)
goto out;
ret = ib_register_client(&rds_ib_client);
if (ret)
goto out_fmr_exit;
ret = rds_ib_sysctl_init();
if (ret)
goto out_ibreg;
......@@ -317,7 +429,9 @@ int __init rds_ib_init(void)
out_sysctl:
rds_ib_sysctl_exit();
out_ibreg:
ib_unregister_client(&rds_ib_client);
rds_ib_unregister_client();
out_fmr_exit:
rds_ib_fmr_exit();
out:
return ret;
}
......
......@@ -3,11 +3,13 @@
#include <rdma/ib_verbs.h>
#include <rdma/rdma_cm.h>
#include <linux/pci.h>
#include <linux/slab.h>
#include "rds.h"
#include "rdma_transport.h"
#define RDS_FMR_SIZE 256
#define RDS_FMR_POOL_SIZE 4096
#define RDS_FMR_POOL_SIZE 8192
#define RDS_IB_MAX_SGE 8
#define RDS_IB_RECV_SGE 2
......@@ -19,6 +21,9 @@
#define RDS_IB_SUPPORTED_PROTOCOLS 0x00000003 /* minor versions supported */
#define RDS_IB_RECYCLE_BATCH_COUNT 32
extern struct rw_semaphore rds_ib_devices_lock;
extern struct list_head rds_ib_devices;
/*
......@@ -26,20 +31,29 @@ extern struct list_head rds_ib_devices;
* try and minimize the amount of memory tied up both the device and
* socket receive queues.
*/
/* page offset of the final full frag that fits in the page */
#define RDS_PAGE_LAST_OFF (((PAGE_SIZE / RDS_FRAG_SIZE) - 1) * RDS_FRAG_SIZE)
struct rds_page_frag {
struct list_head f_item;
struct page *f_page;
unsigned long f_offset;
dma_addr_t f_mapped;
struct list_head f_cache_entry;
struct scatterlist f_sg;
};
struct rds_ib_incoming {
struct list_head ii_frags;
struct list_head ii_cache_entry;
struct rds_incoming ii_inc;
};
struct rds_ib_cache_head {
struct list_head *first;
unsigned long count;
};
struct rds_ib_refill_cache {
struct rds_ib_cache_head *percpu;
struct list_head *xfer;
struct list_head *ready;
};
struct rds_ib_connect_private {
/* Add new fields at the end, and don't permute existing fields. */
__be32 dp_saddr;
......@@ -53,8 +67,7 @@ struct rds_ib_connect_private {
};
struct rds_ib_send_work {
struct rds_message *s_rm;
struct rds_rdma_op *s_op;
void *s_op;
struct ib_send_wr s_wr;
struct ib_sge s_sge[RDS_IB_MAX_SGE];
unsigned long s_queued;
......@@ -92,10 +105,11 @@ struct rds_ib_connection {
/* tx */
struct rds_ib_work_ring i_send_ring;
struct rds_message *i_rm;
struct rm_data_op *i_data_op;
struct rds_header *i_send_hdrs;
u64 i_send_hdrs_dma;
struct rds_ib_send_work *i_sends;
atomic_t i_signaled_sends;
/* rx */
struct tasklet_struct i_recv_tasklet;
......@@ -106,8 +120,9 @@ struct rds_ib_connection {
struct rds_header *i_recv_hdrs;
u64 i_recv_hdrs_dma;
struct rds_ib_recv_work *i_recvs;
struct rds_page_frag i_frag;
u64 i_ack_recv; /* last ACK received */
struct rds_ib_refill_cache i_cache_incs;
struct rds_ib_refill_cache i_cache_frags;
/* sending acks */
unsigned long i_ack_flags;
......@@ -138,7 +153,6 @@ struct rds_ib_connection {
/* Batched completions */
unsigned int i_unsignaled_wrs;
long i_unsignaled_bytes;
};
/* This assumes that atomic_t is at least 32 bits */
......@@ -164,9 +178,17 @@ struct rds_ib_device {
unsigned int max_fmrs;
int max_sge;
unsigned int max_wrs;
unsigned int max_initiator_depth;
unsigned int max_responder_resources;
spinlock_t spinlock; /* protect the above */
atomic_t refcount;
struct work_struct free_work;
};
#define pcidev_to_node(pcidev) pcibus_to_node(pcidev->bus)
#define ibdev_to_node(ibdev) pcidev_to_node(to_pci_dev(ibdev->dma_device))
#define rdsibdev_to_node(rdsibdev) ibdev_to_node(rdsibdev->dev)
/* bits for i_ack_flags */
#define IB_ACK_IN_FLIGHT 0
#define IB_ACK_REQUESTED 1
......@@ -202,6 +224,8 @@ struct rds_ib_statistics {
uint64_t s_ib_rdma_mr_pool_flush;
uint64_t s_ib_rdma_mr_pool_wait;
uint64_t s_ib_rdma_mr_pool_depleted;
uint64_t s_ib_atomic_cswp;
uint64_t s_ib_atomic_fadd;
};
extern struct workqueue_struct *rds_ib_wq;
......@@ -243,6 +267,8 @@ static inline void rds_ib_dma_sync_sg_for_device(struct ib_device *dev,
extern struct rds_transport rds_ib_transport;
extern void rds_ib_add_one(struct ib_device *device);
extern void rds_ib_remove_one(struct ib_device *device);
struct rds_ib_device *rds_ib_get_client_data(struct ib_device *device);
void rds_ib_dev_put(struct rds_ib_device *rds_ibdev);
extern struct ib_client rds_ib_client;
extern unsigned int fmr_pool_size;
......@@ -258,7 +284,7 @@ void rds_ib_conn_free(void *arg);
int rds_ib_conn_connect(struct rds_connection *conn);
void rds_ib_conn_shutdown(struct rds_connection *conn);
void rds_ib_state_change(struct sock *sk);
int __init rds_ib_listen_init(void);
int rds_ib_listen_init(void);
void rds_ib_listen_stop(void);
void __rds_ib_conn_error(struct rds_connection *conn, const char *, ...);
int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id,
......@@ -275,15 +301,7 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn,
int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr);
void rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn);
void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn);
void __rds_ib_destroy_conns(struct list_head *list, spinlock_t *list_lock);
static inline void rds_ib_destroy_nodev_conns(void)
{
__rds_ib_destroy_conns(&ib_nodev_conns, &ib_nodev_conns_lock);
}
static inline void rds_ib_destroy_conns(struct rds_ib_device *rds_ibdev)
{
__rds_ib_destroy_conns(&rds_ibdev->conn_list, &rds_ibdev->spinlock);
}
void rds_ib_destroy_nodev_conns(void);
struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *);
void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_connection *iinfo);
void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *);
......@@ -292,14 +310,16 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
void rds_ib_sync_mr(void *trans_private, int dir);
void rds_ib_free_mr(void *trans_private, int invalidate);
void rds_ib_flush_mrs(void);
int rds_ib_fmr_init(void);
void rds_ib_fmr_exit(void);
/* ib_recv.c */
int __init rds_ib_recv_init(void);
int rds_ib_recv_init(void);
void rds_ib_recv_exit(void);
int rds_ib_recv(struct rds_connection *conn);
int rds_ib_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
gfp_t page_gfp, int prefill);
void rds_ib_inc_purge(struct rds_incoming *inc);
int rds_ib_recv_alloc_caches(struct rds_ib_connection *ic);
void rds_ib_recv_free_caches(struct rds_ib_connection *ic);
void rds_ib_recv_refill(struct rds_connection *conn, int prefill);
void rds_ib_inc_free(struct rds_incoming *inc);
int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iovec *iov,
size_t size);
......@@ -325,17 +345,19 @@ u32 rds_ib_ring_completed(struct rds_ib_work_ring *ring, u32 wr_id, u32 oldest);
extern wait_queue_head_t rds_ib_ring_empty_wait;
/* ib_send.c */
char *rds_ib_wc_status_str(enum ib_wc_status status);
void rds_ib_xmit_complete(struct rds_connection *conn);
int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
unsigned int hdr_off, unsigned int sg, unsigned int off);
void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context);
void rds_ib_send_init_ring(struct rds_ib_connection *ic);
void rds_ib_send_clear_ring(struct rds_ib_connection *ic);
int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op);
int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op);
void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits);
void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted);
int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
u32 *adv_credits, int need_posted, int max_posted);
int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op);
/* ib_stats.c */
DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats);
......@@ -344,7 +366,7 @@ unsigned int rds_ib_stats_info_copy(struct rds_info_iterator *iter,
unsigned int avail);
/* ib_sysctl.c */
int __init rds_ib_sysctl_init(void);
int rds_ib_sysctl_init(void);
void rds_ib_sysctl_exit(void);
extern unsigned long rds_ib_sysctl_max_send_wr;
extern unsigned long rds_ib_sysctl_max_recv_wr;
......@@ -354,28 +376,4 @@ extern unsigned long rds_ib_sysctl_max_recv_allocation;
extern unsigned int rds_ib_sysctl_flow_control;
extern ctl_table rds_ib_sysctl_table[];
/*
* Helper functions for getting/setting the header and data SGEs in
* RDS packets (not RDMA)
*
* From version 3.1 onwards, header is in front of data in the sge.
*/
static inline struct ib_sge *
rds_ib_header_sge(struct rds_ib_connection *ic, struct ib_sge *sge)
{
if (ic->conn->c_version > RDS_PROTOCOL_3_0)
return &sge[0];
else
return &sge[1];
}
static inline struct ib_sge *
rds_ib_data_sge(struct rds_ib_connection *ic, struct ib_sge *sge)
{
if (ic->conn->c_version > RDS_PROTOCOL_3_0)
return &sge[1];
else
return &sge[0];
}
#endif
......@@ -38,6 +38,36 @@
#include "rds.h"
#include "ib.h"
static char *rds_ib_event_type_strings[] = {
#define RDS_IB_EVENT_STRING(foo) \
[IB_EVENT_##foo] = __stringify(IB_EVENT_##foo)
RDS_IB_EVENT_STRING(CQ_ERR),
RDS_IB_EVENT_STRING(QP_FATAL),
RDS_IB_EVENT_STRING(QP_REQ_ERR),
RDS_IB_EVENT_STRING(QP_ACCESS_ERR),
RDS_IB_EVENT_STRING(COMM_EST),
RDS_IB_EVENT_STRING(SQ_DRAINED),
RDS_IB_EVENT_STRING(PATH_MIG),
RDS_IB_EVENT_STRING(PATH_MIG_ERR),
RDS_IB_EVENT_STRING(DEVICE_FATAL),
RDS_IB_EVENT_STRING(PORT_ACTIVE),
RDS_IB_EVENT_STRING(PORT_ERR),
RDS_IB_EVENT_STRING(LID_CHANGE),
RDS_IB_EVENT_STRING(PKEY_CHANGE),
RDS_IB_EVENT_STRING(SM_CHANGE),
RDS_IB_EVENT_STRING(SRQ_ERR),
RDS_IB_EVENT_STRING(SRQ_LIMIT_REACHED),
RDS_IB_EVENT_STRING(QP_LAST_WQE_REACHED),
RDS_IB_EVENT_STRING(CLIENT_REREGISTER),
#undef RDS_IB_EVENT_STRING
};
static char *rds_ib_event_str(enum ib_event_type type)
{
return rds_str_array(rds_ib_event_type_strings,
ARRAY_SIZE(rds_ib_event_type_strings), type);
};
/*
* Set the selected protocol version
*/
......@@ -95,7 +125,6 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even
{
const struct rds_ib_connect_private *dp = NULL;
struct rds_ib_connection *ic = conn->c_transport_data;
struct rds_ib_device *rds_ibdev;
struct ib_qp_attr qp_attr;
int err;
......@@ -111,11 +140,21 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even
}
}
printk(KERN_NOTICE "RDS/IB: connected to %pI4 version %u.%u%s\n",
&conn->c_faddr,
RDS_PROTOCOL_MAJOR(conn->c_version),
RDS_PROTOCOL_MINOR(conn->c_version),
ic->i_flowctl ? ", flow control" : "");
if (conn->c_version < RDS_PROTOCOL(3,1)) {
printk(KERN_NOTICE "RDS/IB: Connection to %pI4 version %u.%u failed,"
" no longer supported\n",
&conn->c_faddr,
RDS_PROTOCOL_MAJOR(conn->c_version),
RDS_PROTOCOL_MINOR(conn->c_version));
rds_conn_destroy(conn);
return;
} else {
printk(KERN_NOTICE "RDS/IB: connected to %pI4 version %u.%u%s\n",
&conn->c_faddr,
RDS_PROTOCOL_MAJOR(conn->c_version),
RDS_PROTOCOL_MINOR(conn->c_version),
ic->i_flowctl ? ", flow control" : "");
}
/*
* Init rings and fill recv. this needs to wait until protocol negotiation
......@@ -125,7 +164,7 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even
rds_ib_recv_init_ring(ic);
/* Post receive buffers - as a side effect, this will update
* the posted credit count. */
rds_ib_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 1);
rds_ib_recv_refill(conn, 1);
/* Tune RNR behavior */
rds_ib_tune_rnr(ic, &qp_attr);
......@@ -135,12 +174,11 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even
if (err)
printk(KERN_NOTICE "ib_modify_qp(IB_QP_STATE, RTS): err=%d\n", err);
/* update ib_device with this local ipaddr & conn */
rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
err = rds_ib_update_ipaddr(rds_ibdev, conn->c_laddr);
/* update ib_device with this local ipaddr */
err = rds_ib_update_ipaddr(ic->rds_ibdev, conn->c_laddr);
if (err)
printk(KERN_ERR "rds_ib_update_ipaddr failed (%d)\n", err);
rds_ib_add_conn(rds_ibdev, conn);
printk(KERN_ERR "rds_ib_update_ipaddr failed (%d)\n",
err);
/* If the peer gave us the last packet it saw, process this as if
* we had received a regular ACK. */
......@@ -153,18 +191,23 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even
static void rds_ib_cm_fill_conn_param(struct rds_connection *conn,
struct rdma_conn_param *conn_param,
struct rds_ib_connect_private *dp,
u32 protocol_version)
u32 protocol_version,
u32 max_responder_resources,
u32 max_initiator_depth)
{
struct rds_ib_connection *ic = conn->c_transport_data;
struct rds_ib_device *rds_ibdev = ic->rds_ibdev;
memset(conn_param, 0, sizeof(struct rdma_conn_param));
/* XXX tune these? */
conn_param->responder_resources = 1;
conn_param->initiator_depth = 1;
conn_param->responder_resources =
min_t(u32, rds_ibdev->max_responder_resources, max_responder_resources);
conn_param->initiator_depth =
min_t(u32, rds_ibdev->max_initiator_depth, max_initiator_depth);
conn_param->retry_count = min_t(unsigned int, rds_ib_retry_count, 7);
conn_param->rnr_retry_count = 7;
if (dp) {
struct rds_ib_connection *ic = conn->c_transport_data;
memset(dp, 0, sizeof(*dp));
dp->dp_saddr = conn->c_laddr;
dp->dp_daddr = conn->c_faddr;
......@@ -189,7 +232,8 @@ static void rds_ib_cm_fill_conn_param(struct rds_connection *conn,
static void rds_ib_cq_event_handler(struct ib_event *event, void *data)
{
rdsdebug("event %u data %p\n", event->event, data);
rdsdebug("event %u (%s) data %p\n",
event->event, rds_ib_event_str(event->event), data);
}
static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
......@@ -197,16 +241,18 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
struct rds_connection *conn = data;
struct rds_ib_connection *ic = conn->c_transport_data;
rdsdebug("conn %p ic %p event %u\n", conn, ic, event->event);
rdsdebug("conn %p ic %p event %u (%s)\n", conn, ic, event->event,
rds_ib_event_str(event->event));
switch (event->event) {
case IB_EVENT_COMM_EST:
rdma_notify(ic->i_cm_id, IB_EVENT_COMM_EST);
break;
default:
rdsdebug("Fatal QP Event %u "
rdsdebug("Fatal QP Event %u (%s) "
"- connection %pI4->%pI4, reconnecting\n",
event->event, &conn->c_laddr, &conn->c_faddr);
event->event, rds_ib_event_str(event->event),
&conn->c_laddr, &conn->c_faddr);
rds_conn_drop(conn);
break;
}
......@@ -224,18 +270,16 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
struct rds_ib_device *rds_ibdev;
int ret;
/* rds_ib_add_one creates a rds_ib_device object per IB device,
* and allocates a protection domain, memory range and FMR pool
* for each. If that fails for any reason, it will not register
* the rds_ibdev at all.
/*
* It's normal to see a null device if an incoming connection races
* with device removal, so we don't print a warning.
*/
rds_ibdev = ib_get_client_data(dev, &rds_ib_client);
if (rds_ibdev == NULL) {
if (printk_ratelimit())
printk(KERN_NOTICE "RDS/IB: No client_data for device %s\n",
dev->name);
rds_ibdev = rds_ib_get_client_data(dev);
if (!rds_ibdev)
return -EOPNOTSUPP;
}
/* add the conn now so that connection establishment has the dev */
rds_ib_add_conn(rds_ibdev, conn);
if (rds_ibdev->max_wrs < ic->i_send_ring.w_nr + 1)
rds_ib_ring_resize(&ic->i_send_ring, rds_ibdev->max_wrs - 1);
......@@ -306,7 +350,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
ic->i_send_ring.w_nr *
sizeof(struct rds_header),
&ic->i_send_hdrs_dma, GFP_KERNEL);
if (ic->i_send_hdrs == NULL) {
if (!ic->i_send_hdrs) {
ret = -ENOMEM;
rdsdebug("ib_dma_alloc_coherent send failed\n");
goto out;
......@@ -316,7 +360,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
ic->i_recv_ring.w_nr *
sizeof(struct rds_header),
&ic->i_recv_hdrs_dma, GFP_KERNEL);
if (ic->i_recv_hdrs == NULL) {
if (!ic->i_recv_hdrs) {
ret = -ENOMEM;
rdsdebug("ib_dma_alloc_coherent recv failed\n");
goto out;
......@@ -324,22 +368,24 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
ic->i_ack = ib_dma_alloc_coherent(dev, sizeof(struct rds_header),
&ic->i_ack_dma, GFP_KERNEL);
if (ic->i_ack == NULL) {
if (!ic->i_ack) {
ret = -ENOMEM;
rdsdebug("ib_dma_alloc_coherent ack failed\n");
goto out;
}
ic->i_sends = vmalloc(ic->i_send_ring.w_nr * sizeof(struct rds_ib_send_work));
if (ic->i_sends == NULL) {
ic->i_sends = vmalloc_node(ic->i_send_ring.w_nr * sizeof(struct rds_ib_send_work),
ibdev_to_node(dev));
if (!ic->i_sends) {
ret = -ENOMEM;
rdsdebug("send allocation failed\n");
goto out;
}
memset(ic->i_sends, 0, ic->i_send_ring.w_nr * sizeof(struct rds_ib_send_work));
ic->i_recvs = vmalloc(ic->i_recv_ring.w_nr * sizeof(struct rds_ib_recv_work));
if (ic->i_recvs == NULL) {
ic->i_recvs = vmalloc_node(ic->i_recv_ring.w_nr * sizeof(struct rds_ib_recv_work),
ibdev_to_node(dev));
if (!ic->i_recvs) {
ret = -ENOMEM;
rdsdebug("recv allocation failed\n");
goto out;
......@@ -352,6 +398,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
ic->i_send_cq, ic->i_recv_cq);
out:
rds_ib_dev_put(rds_ibdev);
return ret;
}
......@@ -409,7 +456,7 @@ int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id,
struct rds_ib_connection *ic = NULL;
struct rdma_conn_param conn_param;
u32 version;
int err, destroy = 1;
int err = 1, destroy = 1;
/* Check whether the remote protocol version matches ours. */
version = rds_ib_protocol_compatible(event);
......@@ -448,7 +495,6 @@ int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id,
/* Wait and see - our connect may still be succeeding */
rds_ib_stats_inc(s_ib_connect_raced);
}
mutex_unlock(&conn->c_cm_lock);
goto out;
}
......@@ -479,20 +525,20 @@ int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id,
goto out;
}
rds_ib_cm_fill_conn_param(conn, &conn_param, &dp_rep, version);
rds_ib_cm_fill_conn_param(conn, &conn_param, &dp_rep, version,
event->param.conn.responder_resources,
event->param.conn.initiator_depth);
/* rdma_accept() calls rdma_reject() internally if it fails */
err = rdma_accept(cm_id, &conn_param);
mutex_unlock(&conn->c_cm_lock);
if (err) {
if (err)
rds_ib_conn_error(conn, "rdma_accept failed (%d)\n", err);
goto out;
}
return 0;
out:
rdma_reject(cm_id, NULL, 0);
if (conn)
mutex_unlock(&conn->c_cm_lock);
if (err)
rdma_reject(cm_id, NULL, 0);
return destroy;
}
......@@ -516,8 +562,8 @@ int rds_ib_cm_initiate_connect(struct rdma_cm_id *cm_id)
goto out;
}
rds_ib_cm_fill_conn_param(conn, &conn_param, &dp, RDS_PROTOCOL_VERSION);
rds_ib_cm_fill_conn_param(conn, &conn_param, &dp, RDS_PROTOCOL_VERSION,
UINT_MAX, UINT_MAX);
ret = rdma_connect(cm_id, &conn_param);
if (ret)
rds_ib_conn_error(conn, "rdma_connect failed (%d)\n", ret);
......@@ -601,9 +647,19 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
ic->i_cm_id, err);
}
/*
* We want to wait for tx and rx completion to finish
* before we tear down the connection, but we have to be
* careful not to get stuck waiting on a send ring that
* only has unsignaled sends in it. We've shutdown new
* sends before getting here so by waiting for signaled
* sends to complete we're ensured that there will be no
* more tx processing.
*/
wait_event(rds_ib_ring_empty_wait,
rds_ib_ring_empty(&ic->i_send_ring) &&
rds_ib_ring_empty(&ic->i_recv_ring));
rds_ib_ring_empty(&ic->i_recv_ring) &&
(atomic_read(&ic->i_signaled_sends) == 0));
tasklet_kill(&ic->i_recv_tasklet);
if (ic->i_send_hdrs)
ib_dma_free_coherent(dev,
......@@ -654,9 +710,12 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
BUG_ON(ic->rds_ibdev);
/* Clear pending transmit */
if (ic->i_rm) {
rds_message_put(ic->i_rm);
ic->i_rm = NULL;
if (ic->i_data_op) {
struct rds_message *rm;
rm = container_of(ic->i_data_op, struct rds_message, data);
rds_message_put(rm);
ic->i_data_op = NULL;
}
/* Clear the ACK state */
......@@ -690,12 +749,19 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
{
struct rds_ib_connection *ic;
unsigned long flags;
int ret;
/* XXX too lazy? */
ic = kzalloc(sizeof(struct rds_ib_connection), GFP_KERNEL);
if (ic == NULL)
if (!ic)
return -ENOMEM;
ret = rds_ib_recv_alloc_caches(ic);
if (ret) {
kfree(ic);
return ret;
}
INIT_LIST_HEAD(&ic->ib_node);
tasklet_init(&ic->i_recv_tasklet, rds_ib_recv_tasklet_fn,
(unsigned long) ic);
......@@ -703,6 +769,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
#ifndef KERNEL_HAS_ATOMIC64
spin_lock_init(&ic->i_ack_lock);
#endif
atomic_set(&ic->i_signaled_sends, 0);
/*
* rds_ib_conn_shutdown() waits for these to be emptied so they
......@@ -744,6 +811,8 @@ void rds_ib_conn_free(void *arg)
list_del(&ic->ib_node);
spin_unlock_irq(lock_ptr);
rds_ib_recv_free_caches(ic);
kfree(ic);
}
......
......@@ -32,11 +32,16 @@
*/
#include <linux/kernel.h>
#include <linux/slab.h>
#include <linux/rculist.h>
#include "rds.h"
#include "rdma.h"
#include "ib.h"
#include "xlist.h"
struct workqueue_struct *rds_ib_fmr_wq;
static DEFINE_PER_CPU(unsigned long, clean_list_grace);
#define CLEAN_LIST_BUSY_BIT 0
/*
* This is stored as mr->r_trans_private.
......@@ -45,7 +50,11 @@ struct rds_ib_mr {
struct rds_ib_device *device;
struct rds_ib_mr_pool *pool;
struct ib_fmr *fmr;
struct list_head list;
struct xlist_head xlist;
/* unmap_list is for freeing */
struct list_head unmap_list;
unsigned int remap_count;
struct scatterlist *sg;
......@@ -59,14 +68,16 @@ struct rds_ib_mr {
*/
struct rds_ib_mr_pool {
struct mutex flush_lock; /* serialize fmr invalidate */
struct work_struct flush_worker; /* flush worker */
struct delayed_work flush_worker; /* flush worker */
spinlock_t list_lock; /* protect variables below */
atomic_t item_count; /* total # of MRs */
atomic_t dirty_count; /* # dirty of MRs */
struct list_head drop_list; /* MRs that have reached their max_maps limit */
struct list_head free_list; /* unused MRs */
struct list_head clean_list; /* unused & unamapped MRs */
struct xlist_head drop_list; /* MRs that have reached their max_maps limit */
struct xlist_head free_list; /* unused MRs */
struct xlist_head clean_list; /* global unused & unamapped MRs */
wait_queue_head_t flush_wait;
atomic_t free_pinned; /* memory pinned by free MRs */
unsigned long max_items;
unsigned long max_items_soft;
......@@ -74,7 +85,7 @@ struct rds_ib_mr_pool {
struct ib_fmr_attr fmr_attr;
};
static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all);
static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all, struct rds_ib_mr **);
static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr);
static void rds_ib_mr_pool_flush_worker(struct work_struct *work);
......@@ -83,16 +94,17 @@ static struct rds_ib_device *rds_ib_get_device(__be32 ipaddr)
struct rds_ib_device *rds_ibdev;
struct rds_ib_ipaddr *i_ipaddr;
list_for_each_entry(rds_ibdev, &rds_ib_devices, list) {
spin_lock_irq(&rds_ibdev->spinlock);
list_for_each_entry(i_ipaddr, &rds_ibdev->ipaddr_list, list) {
rcu_read_lock();
list_for_each_entry_rcu(rds_ibdev, &rds_ib_devices, list) {
list_for_each_entry_rcu(i_ipaddr, &rds_ibdev->ipaddr_list, list) {
if (i_ipaddr->ipaddr == ipaddr) {
spin_unlock_irq(&rds_ibdev->spinlock);
atomic_inc(&rds_ibdev->refcount);
rcu_read_unlock();
return rds_ibdev;
}
}
spin_unlock_irq(&rds_ibdev->spinlock);
}
rcu_read_unlock();
return NULL;
}
......@@ -108,7 +120,7 @@ static int rds_ib_add_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
i_ipaddr->ipaddr = ipaddr;
spin_lock_irq(&rds_ibdev->spinlock);
list_add_tail(&i_ipaddr->list, &rds_ibdev->ipaddr_list);
list_add_tail_rcu(&i_ipaddr->list, &rds_ibdev->ipaddr_list);
spin_unlock_irq(&rds_ibdev->spinlock);
return 0;
......@@ -116,17 +128,24 @@ static int rds_ib_add_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
static void rds_ib_remove_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
{
struct rds_ib_ipaddr *i_ipaddr, *next;
struct rds_ib_ipaddr *i_ipaddr;
struct rds_ib_ipaddr *to_free = NULL;
spin_lock_irq(&rds_ibdev->spinlock);
list_for_each_entry_safe(i_ipaddr, next, &rds_ibdev->ipaddr_list, list) {
list_for_each_entry_rcu(i_ipaddr, &rds_ibdev->ipaddr_list, list) {
if (i_ipaddr->ipaddr == ipaddr) {
list_del(&i_ipaddr->list);
kfree(i_ipaddr);
list_del_rcu(&i_ipaddr->list);
to_free = i_ipaddr;
break;
}
}
spin_unlock_irq(&rds_ibdev->spinlock);
if (to_free) {
synchronize_rcu();
kfree(to_free);
}
}
int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
......@@ -134,8 +153,10 @@ int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
struct rds_ib_device *rds_ibdev_old;
rds_ibdev_old = rds_ib_get_device(ipaddr);
if (rds_ibdev_old)
if (rds_ibdev_old) {
rds_ib_remove_ipaddr(rds_ibdev_old, ipaddr);
rds_ib_dev_put(rds_ibdev_old);
}
return rds_ib_add_ipaddr(rds_ibdev, ipaddr);
}
......@@ -156,6 +177,7 @@ void rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *con
spin_unlock_irq(&ib_nodev_conns_lock);
ic->rds_ibdev = rds_ibdev;
atomic_inc(&rds_ibdev->refcount);
}
void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn)
......@@ -175,18 +197,18 @@ void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *
spin_unlock(&ib_nodev_conns_lock);
ic->rds_ibdev = NULL;
rds_ib_dev_put(rds_ibdev);
}
void __rds_ib_destroy_conns(struct list_head *list, spinlock_t *list_lock)
void rds_ib_destroy_nodev_conns(void)
{
struct rds_ib_connection *ic, *_ic;
LIST_HEAD(tmp_list);
/* avoid calling conn_destroy with irqs off */
spin_lock_irq(list_lock);
list_splice(list, &tmp_list);
INIT_LIST_HEAD(list);
spin_unlock_irq(list_lock);
spin_lock_irq(&ib_nodev_conns_lock);
list_splice(&ib_nodev_conns, &tmp_list);
spin_unlock_irq(&ib_nodev_conns_lock);
list_for_each_entry_safe(ic, _ic, &tmp_list, ib_node)
rds_conn_destroy(ic->conn);
......@@ -200,12 +222,12 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
if (!pool)
return ERR_PTR(-ENOMEM);
INIT_LIST_HEAD(&pool->free_list);
INIT_LIST_HEAD(&pool->drop_list);
INIT_LIST_HEAD(&pool->clean_list);
INIT_XLIST_HEAD(&pool->free_list);
INIT_XLIST_HEAD(&pool->drop_list);
INIT_XLIST_HEAD(&pool->clean_list);
mutex_init(&pool->flush_lock);
spin_lock_init(&pool->list_lock);
INIT_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
init_waitqueue_head(&pool->flush_wait);
INIT_DELAYED_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
pool->fmr_attr.max_pages = fmr_message_size;
pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
......@@ -233,34 +255,60 @@ void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_co
void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool)
{
flush_workqueue(rds_wq);
rds_ib_flush_mr_pool(pool, 1);
cancel_delayed_work_sync(&pool->flush_worker);
rds_ib_flush_mr_pool(pool, 1, NULL);
WARN_ON(atomic_read(&pool->item_count));
WARN_ON(atomic_read(&pool->free_pinned));
kfree(pool);
}
static void refill_local(struct rds_ib_mr_pool *pool, struct xlist_head *xl,
struct rds_ib_mr **ibmr_ret)
{
struct xlist_head *ibmr_xl;
ibmr_xl = xlist_del_head_fast(xl);
*ibmr_ret = list_entry(ibmr_xl, struct rds_ib_mr, xlist);
}
static inline struct rds_ib_mr *rds_ib_reuse_fmr(struct rds_ib_mr_pool *pool)
{
struct rds_ib_mr *ibmr = NULL;
unsigned long flags;
struct xlist_head *ret;
unsigned long *flag;
spin_lock_irqsave(&pool->list_lock, flags);
if (!list_empty(&pool->clean_list)) {
ibmr = list_entry(pool->clean_list.next, struct rds_ib_mr, list);
list_del_init(&ibmr->list);
}
spin_unlock_irqrestore(&pool->list_lock, flags);
preempt_disable();
flag = &__get_cpu_var(clean_list_grace);
set_bit(CLEAN_LIST_BUSY_BIT, flag);
ret = xlist_del_head(&pool->clean_list);
if (ret)
ibmr = list_entry(ret, struct rds_ib_mr, xlist);
clear_bit(CLEAN_LIST_BUSY_BIT, flag);
preempt_enable();
return ibmr;
}
static inline void wait_clean_list_grace(void)
{
int cpu;
unsigned long *flag;
for_each_online_cpu(cpu) {
flag = &per_cpu(clean_list_grace, cpu);
while (test_bit(CLEAN_LIST_BUSY_BIT, flag))
cpu_relax();
}
}
static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
{
struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
struct rds_ib_mr *ibmr = NULL;
int err = 0, iter = 0;
if (atomic_read(&pool->dirty_count) >= pool->max_items / 10)
queue_delayed_work(rds_ib_fmr_wq, &pool->flush_worker, 10);
while (1) {
ibmr = rds_ib_reuse_fmr(pool);
if (ibmr)
......@@ -287,19 +335,24 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
/* We do have some empty MRs. Flush them out. */
rds_ib_stats_inc(s_ib_rdma_mr_pool_wait);
rds_ib_flush_mr_pool(pool, 0);
rds_ib_flush_mr_pool(pool, 0, &ibmr);
if (ibmr)
return ibmr;
}
ibmr = kzalloc(sizeof(*ibmr), GFP_KERNEL);
ibmr = kzalloc_node(sizeof(*ibmr), GFP_KERNEL, rdsibdev_to_node(rds_ibdev));
if (!ibmr) {
err = -ENOMEM;
goto out_no_cigar;
}
memset(ibmr, 0, sizeof(*ibmr));
ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd,
(IB_ACCESS_LOCAL_WRITE |
IB_ACCESS_REMOTE_READ |
IB_ACCESS_REMOTE_WRITE),
IB_ACCESS_REMOTE_WRITE|
IB_ACCESS_REMOTE_ATOMIC),
&pool->fmr_attr);
if (IS_ERR(ibmr->fmr)) {
err = PTR_ERR(ibmr->fmr);
......@@ -367,7 +420,8 @@ static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibm
if (page_cnt > fmr_message_size)
return -EINVAL;
dma_pages = kmalloc(sizeof(u64) * page_cnt, GFP_ATOMIC);
dma_pages = kmalloc_node(sizeof(u64) * page_cnt, GFP_ATOMIC,
rdsibdev_to_node(rds_ibdev));
if (!dma_pages)
return -ENOMEM;
......@@ -441,7 +495,7 @@ static void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
/* FIXME we need a way to tell a r/w MR
* from a r/o MR */
BUG_ON(in_interrupt());
BUG_ON(irqs_disabled());
set_page_dirty(page);
put_page(page);
}
......@@ -476,34 +530,110 @@ static inline unsigned int rds_ib_flush_goal(struct rds_ib_mr_pool *pool, int fr
return 0;
}
/*
* given an xlist of mrs, put them all into the list_head for more processing
*/
static void xlist_append_to_list(struct xlist_head *xlist, struct list_head *list)
{
struct rds_ib_mr *ibmr;
struct xlist_head splice;
struct xlist_head *cur;
struct xlist_head *next;
splice.next = NULL;
xlist_splice(xlist, &splice);
cur = splice.next;
while (cur) {
next = cur->next;
ibmr = list_entry(cur, struct rds_ib_mr, xlist);
list_add_tail(&ibmr->unmap_list, list);
cur = next;
}
}
/*
* this takes a list head of mrs and turns it into an xlist of clusters.
* each cluster has an xlist of MR_CLUSTER_SIZE mrs that are ready for
* reuse.
*/
static void list_append_to_xlist(struct rds_ib_mr_pool *pool,
struct list_head *list, struct xlist_head *xlist,
struct xlist_head **tail_ret)
{
struct rds_ib_mr *ibmr;
struct xlist_head *cur_mr = xlist;
struct xlist_head *tail_mr = NULL;
list_for_each_entry(ibmr, list, unmap_list) {
tail_mr = &ibmr->xlist;
tail_mr->next = NULL;
cur_mr->next = tail_mr;
cur_mr = tail_mr;
}
*tail_ret = tail_mr;
}
/*
* Flush our pool of MRs.
* At a minimum, all currently unused MRs are unmapped.
* If the number of MRs allocated exceeds the limit, we also try
* to free as many MRs as needed to get back to this limit.
*/
static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
int free_all, struct rds_ib_mr **ibmr_ret)
{
struct rds_ib_mr *ibmr, *next;
struct xlist_head clean_xlist;
struct xlist_head *clean_tail;
LIST_HEAD(unmap_list);
LIST_HEAD(fmr_list);
unsigned long unpinned = 0;
unsigned long flags;
unsigned int nfreed = 0, ncleaned = 0, free_goal;
int ret = 0;
rds_ib_stats_inc(s_ib_rdma_mr_pool_flush);
mutex_lock(&pool->flush_lock);
if (ibmr_ret) {
DEFINE_WAIT(wait);
while(!mutex_trylock(&pool->flush_lock)) {
ibmr = rds_ib_reuse_fmr(pool);
if (ibmr) {
*ibmr_ret = ibmr;
finish_wait(&pool->flush_wait, &wait);
goto out_nolock;
}
prepare_to_wait(&pool->flush_wait, &wait,
TASK_UNINTERRUPTIBLE);
if (xlist_empty(&pool->clean_list))
schedule();
ibmr = rds_ib_reuse_fmr(pool);
if (ibmr) {
*ibmr_ret = ibmr;
finish_wait(&pool->flush_wait, &wait);
goto out_nolock;
}
}
finish_wait(&pool->flush_wait, &wait);
} else
mutex_lock(&pool->flush_lock);
if (ibmr_ret) {
ibmr = rds_ib_reuse_fmr(pool);
if (ibmr) {
*ibmr_ret = ibmr;
goto out;
}
}
spin_lock_irqsave(&pool->list_lock, flags);
/* Get the list of all MRs to be dropped. Ordering matters -
* we want to put drop_list ahead of free_list. */
list_splice_init(&pool->free_list, &unmap_list);
list_splice_init(&pool->drop_list, &unmap_list);
* we want to put drop_list ahead of free_list.
*/
xlist_append_to_list(&pool->drop_list, &unmap_list);
xlist_append_to_list(&pool->free_list, &unmap_list);
if (free_all)
list_splice_init(&pool->clean_list, &unmap_list);
spin_unlock_irqrestore(&pool->list_lock, flags);
xlist_append_to_list(&pool->clean_list, &unmap_list);
free_goal = rds_ib_flush_goal(pool, free_all);
......@@ -511,19 +641,20 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
goto out;
/* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
list_for_each_entry(ibmr, &unmap_list, list)
list_for_each_entry(ibmr, &unmap_list, unmap_list)
list_add(&ibmr->fmr->list, &fmr_list);
ret = ib_unmap_fmr(&fmr_list);
if (ret)
printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
/* Now we can destroy the DMA mapping and unpin any pages */
list_for_each_entry_safe(ibmr, next, &unmap_list, list) {
list_for_each_entry_safe(ibmr, next, &unmap_list, unmap_list) {
unpinned += ibmr->sg_len;
__rds_ib_teardown_mr(ibmr);
if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) {
rds_ib_stats_inc(s_ib_rdma_mr_free);
list_del(&ibmr->list);
list_del(&ibmr->unmap_list);
ib_dealloc_fmr(ibmr->fmr);
kfree(ibmr);
nfreed++;
......@@ -531,9 +662,27 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
ncleaned++;
}
spin_lock_irqsave(&pool->list_lock, flags);
list_splice(&unmap_list, &pool->clean_list);
spin_unlock_irqrestore(&pool->list_lock, flags);
if (!list_empty(&unmap_list)) {
/* we have to make sure that none of the things we're about
* to put on the clean list would race with other cpus trying
* to pull items off. The xlist would explode if we managed to
* remove something from the clean list and then add it back again
* while another CPU was spinning on that same item in xlist_del_head.
*
* This is pretty unlikely, but just in case wait for an xlist grace period
* here before adding anything back into the clean list.
*/
wait_clean_list_grace();
list_append_to_xlist(pool, &unmap_list, &clean_xlist, &clean_tail);
if (ibmr_ret)
refill_local(pool, &clean_xlist, ibmr_ret);
/* refill_local may have emptied our list */
if (!xlist_empty(&clean_xlist))
xlist_add(clean_xlist.next, clean_tail, &pool->clean_list);
}
atomic_sub(unpinned, &pool->free_pinned);
atomic_sub(ncleaned, &pool->dirty_count);
......@@ -541,14 +690,35 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
out:
mutex_unlock(&pool->flush_lock);
if (waitqueue_active(&pool->flush_wait))
wake_up(&pool->flush_wait);
out_nolock:
return ret;
}
int rds_ib_fmr_init(void)
{
rds_ib_fmr_wq = create_workqueue("rds_fmr_flushd");
if (!rds_ib_fmr_wq)
return -ENOMEM;
return 0;
}
/*
* By the time this is called all the IB devices should have been torn down and
* had their pools freed. As each pool is freed its work struct is waited on,
* so the pool flushing work queue should be idle by the time we get here.
*/
void rds_ib_fmr_exit(void)
{
destroy_workqueue(rds_ib_fmr_wq);
}
static void rds_ib_mr_pool_flush_worker(struct work_struct *work)
{
struct rds_ib_mr_pool *pool = container_of(work, struct rds_ib_mr_pool, flush_worker);
struct rds_ib_mr_pool *pool = container_of(work, struct rds_ib_mr_pool, flush_worker.work);
rds_ib_flush_mr_pool(pool, 0);
rds_ib_flush_mr_pool(pool, 0, NULL);
}
void rds_ib_free_mr(void *trans_private, int invalidate)
......@@ -556,47 +726,49 @@ void rds_ib_free_mr(void *trans_private, int invalidate)
struct rds_ib_mr *ibmr = trans_private;
struct rds_ib_device *rds_ibdev = ibmr->device;
struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
unsigned long flags;
rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len);
/* Return it to the pool's free list */
spin_lock_irqsave(&pool->list_lock, flags);
if (ibmr->remap_count >= pool->fmr_attr.max_maps)
list_add(&ibmr->list, &pool->drop_list);
xlist_add(&ibmr->xlist, &ibmr->xlist, &pool->drop_list);
else
list_add(&ibmr->list, &pool->free_list);
xlist_add(&ibmr->xlist, &ibmr->xlist, &pool->free_list);
atomic_add(ibmr->sg_len, &pool->free_pinned);
atomic_inc(&pool->dirty_count);
spin_unlock_irqrestore(&pool->list_lock, flags);
/* If we've pinned too many pages, request a flush */
if (atomic_read(&pool->free_pinned) >= pool->max_free_pinned ||
atomic_read(&pool->dirty_count) >= pool->max_items / 10)
queue_work(rds_wq, &pool->flush_worker);
queue_delayed_work(rds_ib_fmr_wq, &pool->flush_worker, 10);
if (invalidate) {
if (likely(!in_interrupt())) {
rds_ib_flush_mr_pool(pool, 0);
rds_ib_flush_mr_pool(pool, 0, NULL);
} else {
/* We get here if the user created a MR marked
* as use_once and invalidate at the same time. */
queue_work(rds_wq, &pool->flush_worker);
queue_delayed_work(rds_ib_fmr_wq,
&pool->flush_worker, 10);
}
}
rds_ib_dev_put(rds_ibdev);
}
void rds_ib_flush_mrs(void)
{
struct rds_ib_device *rds_ibdev;
down_read(&rds_ib_devices_lock);
list_for_each_entry(rds_ibdev, &rds_ib_devices, list) {
struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
if (pool)
rds_ib_flush_mr_pool(pool, 0);
rds_ib_flush_mr_pool(pool, 0, NULL);
}
up_read(&rds_ib_devices_lock);
}
void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
......@@ -628,6 +800,7 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
printk(KERN_WARNING "RDS/IB: map_fmr failed (errno=%d)\n", ret);
ibmr->device = rds_ibdev;
rds_ibdev = NULL;
out:
if (ret) {
......@@ -635,5 +808,8 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
rds_ib_free_mr(ibmr, 0);
ibmr = ERR_PTR(ret);
}
if (rds_ibdev)
rds_ib_dev_put(rds_ibdev);
return ibmr;
}
此差异已折叠。
此差异已折叠。
......@@ -67,6 +67,8 @@ static const char *const rds_ib_stat_names[] = {
"ib_rdma_mr_pool_flush",
"ib_rdma_mr_pool_wait",
"ib_rdma_mr_pool_depleted",
"ib_atomic_cswp",
"ib_atomic_fadd",
};
unsigned int rds_ib_stats_info_copy(struct rds_info_iterator *iter,
......
......@@ -49,10 +49,6 @@ unsigned long rds_ib_sysctl_max_unsig_wrs = 16;
static unsigned long rds_ib_sysctl_max_unsig_wr_min = 1;
static unsigned long rds_ib_sysctl_max_unsig_wr_max = 64;
unsigned long rds_ib_sysctl_max_unsig_bytes = (16 << 20);
static unsigned long rds_ib_sysctl_max_unsig_bytes_min = 1;
static unsigned long rds_ib_sysctl_max_unsig_bytes_max = ~0UL;
/*
* This sysctl does nothing.
*
......@@ -93,15 +89,6 @@ ctl_table rds_ib_sysctl_table[] = {
.extra1 = &rds_ib_sysctl_max_unsig_wr_min,
.extra2 = &rds_ib_sysctl_max_unsig_wr_max,
},
{
.procname = "max_unsignaled_bytes",
.data = &rds_ib_sysctl_max_unsig_bytes,
.maxlen = sizeof(unsigned long),
.mode = 0644,
.proc_handler = proc_doulongvec_minmax,
.extra1 = &rds_ib_sysctl_max_unsig_bytes_min,
.extra2 = &rds_ib_sysctl_max_unsig_bytes_max,
},
{
.procname = "max_recv_allocation",
.data = &rds_ib_sysctl_max_recv_allocation,
......@@ -132,10 +119,10 @@ void rds_ib_sysctl_exit(void)
unregister_sysctl_table(rds_ib_sysctl_hdr);
}
int __init rds_ib_sysctl_init(void)
int rds_ib_sysctl_init(void)
{
rds_ib_sysctl_hdr = register_sysctl_paths(rds_ib_sysctl_path, rds_ib_sysctl_table);
if (rds_ib_sysctl_hdr == NULL)
if (!rds_ib_sysctl_hdr)
return -ENOMEM;
return 0;
}
......@@ -76,7 +76,7 @@ void rds_info_register_func(int optname, rds_info_func func)
BUG_ON(optname < RDS_INFO_FIRST || optname > RDS_INFO_LAST);
spin_lock(&rds_info_lock);
BUG_ON(rds_info_funcs[offset] != NULL);
BUG_ON(rds_info_funcs[offset]);
rds_info_funcs[offset] = func;
spin_unlock(&rds_info_lock);
}
......@@ -102,7 +102,7 @@ EXPORT_SYMBOL_GPL(rds_info_deregister_func);
*/
void rds_info_iter_unmap(struct rds_info_iterator *iter)
{
if (iter->addr != NULL) {
if (iter->addr) {
kunmap_atomic(iter->addr, KM_USER0);
iter->addr = NULL;
}
......@@ -117,7 +117,7 @@ void rds_info_copy(struct rds_info_iterator *iter, void *data,
unsigned long this;
while (bytes) {
if (iter->addr == NULL)
if (!iter->addr)
iter->addr = kmap_atomic(*iter->pages, KM_USER0);
this = min(bytes, PAGE_SIZE - iter->offset);
......@@ -188,7 +188,7 @@ int rds_info_getsockopt(struct socket *sock, int optname, char __user *optval,
>> PAGE_SHIFT;
pages = kmalloc(nr_pages * sizeof(struct page *), GFP_KERNEL);
if (pages == NULL) {
if (!pages) {
ret = -ENOMEM;
goto out;
}
......@@ -206,7 +206,7 @@ int rds_info_getsockopt(struct socket *sock, int optname, char __user *optval,
call_func:
func = rds_info_funcs[optname - RDS_INFO_FIRST];
if (func == NULL) {
if (!func) {
ret = -ENOPROTOOPT;
goto out;
}
......@@ -234,7 +234,7 @@ int rds_info_getsockopt(struct socket *sock, int optname, char __user *optval,
ret = -EFAULT;
out:
for (i = 0; pages != NULL && i < nr_pages; i++)
for (i = 0; pages && i < nr_pages; i++)
put_page(pages[i]);
kfree(pages);
......
......@@ -264,7 +264,6 @@ struct rds_transport rds_iw_transport = {
.laddr_check = rds_iw_laddr_check,
.xmit_complete = rds_iw_xmit_complete,
.xmit = rds_iw_xmit,
.xmit_cong_map = NULL,
.xmit_rdma = rds_iw_xmit_rdma,
.recv = rds_iw_recv,
.conn_alloc = rds_iw_conn_alloc,
......@@ -272,7 +271,6 @@ struct rds_transport rds_iw_transport = {
.conn_connect = rds_iw_conn_connect,
.conn_shutdown = rds_iw_conn_shutdown,
.inc_copy_to_user = rds_iw_inc_copy_to_user,
.inc_purge = rds_iw_inc_purge,
.inc_free = rds_iw_inc_free,
.cm_initiate_connect = rds_iw_cm_initiate_connect,
.cm_handle_connect = rds_iw_cm_handle_connect,
......@@ -289,7 +287,7 @@ struct rds_transport rds_iw_transport = {
.t_prefer_loopback = 1,
};
int __init rds_iw_init(void)
int rds_iw_init(void)
{
int ret;
......
......@@ -70,7 +70,7 @@ struct rds_iw_send_work {
struct rds_message *s_rm;
/* We should really put these into a union: */
struct rds_rdma_op *s_op;
struct rm_rdma_op *s_op;
struct rds_iw_mapping *s_mapping;
struct ib_mr *s_mr;
struct ib_fast_reg_page_list *s_page_list;
......@@ -284,7 +284,7 @@ void rds_iw_conn_free(void *arg);
int rds_iw_conn_connect(struct rds_connection *conn);
void rds_iw_conn_shutdown(struct rds_connection *conn);
void rds_iw_state_change(struct sock *sk);
int __init rds_iw_listen_init(void);
int rds_iw_listen_init(void);
void rds_iw_listen_stop(void);
void __rds_iw_conn_error(struct rds_connection *conn, const char *, ...);
int rds_iw_cm_handle_connect(struct rdma_cm_id *cm_id,
......@@ -321,12 +321,11 @@ void rds_iw_flush_mrs(void);
void rds_iw_remove_cm_id(struct rds_iw_device *rds_iwdev, struct rdma_cm_id *cm_id);
/* ib_recv.c */
int __init rds_iw_recv_init(void);
int rds_iw_recv_init(void);
void rds_iw_recv_exit(void);
int rds_iw_recv(struct rds_connection *conn);
int rds_iw_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
gfp_t page_gfp, int prefill);
void rds_iw_inc_purge(struct rds_incoming *inc);
void rds_iw_inc_free(struct rds_incoming *inc);
int rds_iw_inc_copy_to_user(struct rds_incoming *inc, struct iovec *iov,
size_t size);
......@@ -358,7 +357,7 @@ int rds_iw_xmit(struct rds_connection *conn, struct rds_message *rm,
void rds_iw_send_cq_comp_handler(struct ib_cq *cq, void *context);
void rds_iw_send_init_ring(struct rds_iw_connection *ic);
void rds_iw_send_clear_ring(struct rds_iw_connection *ic);
int rds_iw_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op);
int rds_iw_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op);
void rds_iw_send_add_credits(struct rds_connection *conn, unsigned int credits);
void rds_iw_advertise_credits(struct rds_connection *conn, unsigned int posted);
int rds_iw_send_grab_credits(struct rds_iw_connection *ic, u32 wanted,
......@@ -371,7 +370,7 @@ unsigned int rds_iw_stats_info_copy(struct rds_info_iterator *iter,
unsigned int avail);
/* ib_sysctl.c */
int __init rds_iw_sysctl_init(void);
int rds_iw_sysctl_init(void);
void rds_iw_sysctl_exit(void);
extern unsigned long rds_iw_sysctl_max_send_wr;
extern unsigned long rds_iw_sysctl_max_recv_wr;
......
......@@ -257,7 +257,7 @@ static int rds_iw_setup_qp(struct rds_connection *conn)
* the rds_iwdev at all.
*/
rds_iwdev = ib_get_client_data(dev, &rds_iw_client);
if (rds_iwdev == NULL) {
if (!rds_iwdev) {
if (printk_ratelimit())
printk(KERN_NOTICE "RDS/IW: No client_data for device %s\n",
dev->name);
......@@ -292,7 +292,7 @@ static int rds_iw_setup_qp(struct rds_connection *conn)
ic->i_send_ring.w_nr *
sizeof(struct rds_header),
&ic->i_send_hdrs_dma, GFP_KERNEL);
if (ic->i_send_hdrs == NULL) {
if (!ic->i_send_hdrs) {
ret = -ENOMEM;
rdsdebug("ib_dma_alloc_coherent send failed\n");
goto out;
......@@ -302,7 +302,7 @@ static int rds_iw_setup_qp(struct rds_connection *conn)
ic->i_recv_ring.w_nr *
sizeof(struct rds_header),
&ic->i_recv_hdrs_dma, GFP_KERNEL);
if (ic->i_recv_hdrs == NULL) {
if (!ic->i_recv_hdrs) {
ret = -ENOMEM;
rdsdebug("ib_dma_alloc_coherent recv failed\n");
goto out;
......@@ -310,14 +310,14 @@ static int rds_iw_setup_qp(struct rds_connection *conn)
ic->i_ack = ib_dma_alloc_coherent(dev, sizeof(struct rds_header),
&ic->i_ack_dma, GFP_KERNEL);
if (ic->i_ack == NULL) {
if (!ic->i_ack) {
ret = -ENOMEM;
rdsdebug("ib_dma_alloc_coherent ack failed\n");
goto out;
}
ic->i_sends = vmalloc(ic->i_send_ring.w_nr * sizeof(struct rds_iw_send_work));
if (ic->i_sends == NULL) {
if (!ic->i_sends) {
ret = -ENOMEM;
rdsdebug("send allocation failed\n");
goto out;
......@@ -325,7 +325,7 @@ static int rds_iw_setup_qp(struct rds_connection *conn)
rds_iw_send_init_ring(ic);
ic->i_recvs = vmalloc(ic->i_recv_ring.w_nr * sizeof(struct rds_iw_recv_work));
if (ic->i_recvs == NULL) {
if (!ic->i_recvs) {
ret = -ENOMEM;
rdsdebug("recv allocation failed\n");
goto out;
......@@ -696,7 +696,7 @@ int rds_iw_conn_alloc(struct rds_connection *conn, gfp_t gfp)
/* XXX too lazy? */
ic = kzalloc(sizeof(struct rds_iw_connection), GFP_KERNEL);
if (ic == NULL)
if (!ic)
return -ENOMEM;
INIT_LIST_HEAD(&ic->iw_node);
......
......@@ -34,7 +34,6 @@
#include <linux/slab.h>
#include "rds.h"
#include "rdma.h"
#include "iw.h"
......
......@@ -53,7 +53,7 @@ static void rds_iw_frag_drop_page(struct rds_page_frag *frag)
static void rds_iw_frag_free(struct rds_page_frag *frag)
{
rdsdebug("frag %p page %p\n", frag, frag->f_page);
BUG_ON(frag->f_page != NULL);
BUG_ON(frag->f_page);
kmem_cache_free(rds_iw_frag_slab, frag);
}
......@@ -143,14 +143,14 @@ static int rds_iw_recv_refill_one(struct rds_connection *conn,
struct ib_sge *sge;
int ret = -ENOMEM;
if (recv->r_iwinc == NULL) {
if (!recv->r_iwinc) {
if (!atomic_add_unless(&rds_iw_allocation, 1, rds_iw_sysctl_max_recv_allocation)) {
rds_iw_stats_inc(s_iw_rx_alloc_limit);
goto out;
}
recv->r_iwinc = kmem_cache_alloc(rds_iw_incoming_slab,
kptr_gfp);
if (recv->r_iwinc == NULL) {
if (!recv->r_iwinc) {
atomic_dec(&rds_iw_allocation);
goto out;
}
......@@ -158,17 +158,17 @@ static int rds_iw_recv_refill_one(struct rds_connection *conn,
rds_inc_init(&recv->r_iwinc->ii_inc, conn, conn->c_faddr);
}
if (recv->r_frag == NULL) {
if (!recv->r_frag) {
recv->r_frag = kmem_cache_alloc(rds_iw_frag_slab, kptr_gfp);
if (recv->r_frag == NULL)
if (!recv->r_frag)
goto out;
INIT_LIST_HEAD(&recv->r_frag->f_item);
recv->r_frag->f_page = NULL;
}
if (ic->i_frag.f_page == NULL) {
if (!ic->i_frag.f_page) {
ic->i_frag.f_page = alloc_page(page_gfp);
if (ic->i_frag.f_page == NULL)
if (!ic->i_frag.f_page)
goto out;
ic->i_frag.f_offset = 0;
}
......@@ -273,7 +273,7 @@ int rds_iw_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
return ret;
}
void rds_iw_inc_purge(struct rds_incoming *inc)
static void rds_iw_inc_purge(struct rds_incoming *inc)
{
struct rds_iw_incoming *iwinc;
struct rds_page_frag *frag;
......@@ -716,7 +716,7 @@ static void rds_iw_process_recv(struct rds_connection *conn,
* into the inc and save the inc so we can hang upcoming fragments
* off its list.
*/
if (iwinc == NULL) {
if (!iwinc) {
iwinc = recv->r_iwinc;
recv->r_iwinc = NULL;
ic->i_iwinc = iwinc;
......@@ -887,7 +887,7 @@ int rds_iw_recv(struct rds_connection *conn)
return ret;
}
int __init rds_iw_recv_init(void)
int rds_iw_recv_init(void)
{
struct sysinfo si;
int ret = -ENOMEM;
......@@ -899,13 +899,13 @@ int __init rds_iw_recv_init(void)
rds_iw_incoming_slab = kmem_cache_create("rds_iw_incoming",
sizeof(struct rds_iw_incoming),
0, 0, NULL);
if (rds_iw_incoming_slab == NULL)
if (!rds_iw_incoming_slab)
goto out;
rds_iw_frag_slab = kmem_cache_create("rds_iw_frag",
sizeof(struct rds_page_frag),
0, 0, NULL);
if (rds_iw_frag_slab == NULL)
if (!rds_iw_frag_slab)
kmem_cache_destroy(rds_iw_incoming_slab);
else
ret = 0;
......
......@@ -36,7 +36,6 @@
#include <linux/dmapool.h>
#include "rds.h"
#include "rdma.h"
#include "iw.h"
static void rds_iw_send_rdma_complete(struct rds_message *rm,
......@@ -64,13 +63,13 @@ static void rds_iw_send_rdma_complete(struct rds_message *rm,
}
static void rds_iw_send_unmap_rdma(struct rds_iw_connection *ic,
struct rds_rdma_op *op)
struct rm_rdma_op *op)
{
if (op->r_mapped) {
if (op->op_mapped) {
ib_dma_unmap_sg(ic->i_cm_id->device,
op->r_sg, op->r_nents,
op->r_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
op->r_mapped = 0;
op->op_sg, op->op_nents,
op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
op->op_mapped = 0;
}
}
......@@ -83,11 +82,11 @@ static void rds_iw_send_unmap_rm(struct rds_iw_connection *ic,
rdsdebug("ic %p send %p rm %p\n", ic, send, rm);
ib_dma_unmap_sg(ic->i_cm_id->device,
rm->m_sg, rm->m_nents,
rm->data.op_sg, rm->data.op_nents,
DMA_TO_DEVICE);
if (rm->m_rdma_op != NULL) {
rds_iw_send_unmap_rdma(ic, rm->m_rdma_op);
if (rm->rdma.op_active) {
rds_iw_send_unmap_rdma(ic, &rm->rdma);
/* If the user asked for a completion notification on this
* message, we can implement three different semantics:
......@@ -111,10 +110,10 @@ static void rds_iw_send_unmap_rm(struct rds_iw_connection *ic,
*/
rds_iw_send_rdma_complete(rm, wc_status);
if (rm->m_rdma_op->r_write)
rds_stats_add(s_send_rdma_bytes, rm->m_rdma_op->r_bytes);
if (rm->rdma.op_write)
rds_stats_add(s_send_rdma_bytes, rm->rdma.op_bytes);
else
rds_stats_add(s_recv_rdma_bytes, rm->m_rdma_op->r_bytes);
rds_stats_add(s_recv_rdma_bytes, rm->rdma.op_bytes);
}
/* If anyone waited for this message to get flushed out, wake
......@@ -556,25 +555,27 @@ int rds_iw_xmit(struct rds_connection *conn, struct rds_message *rm,
}
/* map the message the first time we see it */
if (ic->i_rm == NULL) {
if (!ic->i_rm) {
/*
printk(KERN_NOTICE "rds_iw_xmit prep msg dport=%u flags=0x%x len=%d\n",
be16_to_cpu(rm->m_inc.i_hdr.h_dport),
rm->m_inc.i_hdr.h_flags,
be32_to_cpu(rm->m_inc.i_hdr.h_len));
*/
if (rm->m_nents) {
rm->m_count = ib_dma_map_sg(dev,
rm->m_sg, rm->m_nents, DMA_TO_DEVICE);
rdsdebug("ic %p mapping rm %p: %d\n", ic, rm, rm->m_count);
if (rm->m_count == 0) {
if (rm->data.op_nents) {
rm->data.op_count = ib_dma_map_sg(dev,
rm->data.op_sg,
rm->data.op_nents,
DMA_TO_DEVICE);
rdsdebug("ic %p mapping rm %p: %d\n", ic, rm, rm->data.op_count);
if (rm->data.op_count == 0) {
rds_iw_stats_inc(s_iw_tx_sg_mapping_failure);
rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc);
ret = -ENOMEM; /* XXX ? */
goto out;
}
} else {
rm->m_count = 0;
rm->data.op_count = 0;
}
ic->i_unsignaled_wrs = rds_iw_sysctl_max_unsig_wrs;
......@@ -590,10 +591,10 @@ int rds_iw_xmit(struct rds_connection *conn, struct rds_message *rm,
/* If it has a RDMA op, tell the peer we did it. This is
* used by the peer to release use-once RDMA MRs. */
if (rm->m_rdma_op) {
if (rm->rdma.op_active) {
struct rds_ext_header_rdma ext_hdr;
ext_hdr.h_rdma_rkey = cpu_to_be32(rm->m_rdma_op->r_key);
ext_hdr.h_rdma_rkey = cpu_to_be32(rm->rdma.op_rkey);
rds_message_add_extension(&rm->m_inc.i_hdr,
RDS_EXTHDR_RDMA, &ext_hdr, sizeof(ext_hdr));
}
......@@ -621,7 +622,7 @@ int rds_iw_xmit(struct rds_connection *conn, struct rds_message *rm,
send = &ic->i_sends[pos];
first = send;
prev = NULL;
scat = &rm->m_sg[sg];
scat = &rm->data.op_sg[sg];
sent = 0;
i = 0;
......@@ -631,7 +632,7 @@ int rds_iw_xmit(struct rds_connection *conn, struct rds_message *rm,
* or when requested by the user. Right now, we let
* the application choose.
*/
if (rm->m_rdma_op && rm->m_rdma_op->r_fence)
if (rm->rdma.op_active && rm->rdma.op_fence)
send_flags = IB_SEND_FENCE;
/*
......@@ -650,7 +651,7 @@ int rds_iw_xmit(struct rds_connection *conn, struct rds_message *rm,
}
/* if there's data reference it with a chain of work reqs */
for (; i < work_alloc && scat != &rm->m_sg[rm->m_count]; i++) {
for (; i < work_alloc && scat != &rm->data.op_sg[rm->data.op_count]; i++) {
unsigned int len;
send = &ic->i_sends[pos];
......@@ -728,7 +729,7 @@ int rds_iw_xmit(struct rds_connection *conn, struct rds_message *rm,
sent += sizeof(struct rds_header);
/* if we finished the message then send completion owns it */
if (scat == &rm->m_sg[rm->m_count]) {
if (scat == &rm->data.op_sg[rm->data.op_count]) {
prev->s_rm = ic->i_rm;
prev->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
ic->i_rm = NULL;
......@@ -784,7 +785,7 @@ static void rds_iw_build_send_fastreg(struct rds_iw_device *rds_iwdev, struct rd
ib_update_fast_reg_key(send->s_mr, send->s_remap_count++);
}
int rds_iw_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
int rds_iw_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
{
struct rds_iw_connection *ic = conn->c_transport_data;
struct rds_iw_send_work *send = NULL;
......@@ -794,7 +795,7 @@ int rds_iw_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
struct rds_iw_device *rds_iwdev;
struct scatterlist *scat;
unsigned long len;
u64 remote_addr = op->r_remote_addr;
u64 remote_addr = op->op_remote_addr;
u32 pos, fr_pos;
u32 work_alloc;
u32 i;
......@@ -806,21 +807,21 @@ int rds_iw_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
rds_iwdev = ib_get_client_data(ic->i_cm_id->device, &rds_iw_client);
/* map the message the first time we see it */
if (!op->r_mapped) {
op->r_count = ib_dma_map_sg(ic->i_cm_id->device,
op->r_sg, op->r_nents, (op->r_write) ?
DMA_TO_DEVICE : DMA_FROM_DEVICE);
rdsdebug("ic %p mapping op %p: %d\n", ic, op, op->r_count);
if (op->r_count == 0) {
if (!op->op_mapped) {
op->op_count = ib_dma_map_sg(ic->i_cm_id->device,
op->op_sg, op->op_nents, (op->op_write) ?
DMA_TO_DEVICE : DMA_FROM_DEVICE);
rdsdebug("ic %p mapping op %p: %d\n", ic, op, op->op_count);
if (op->op_count == 0) {
rds_iw_stats_inc(s_iw_tx_sg_mapping_failure);
ret = -ENOMEM; /* XXX ? */
goto out;
}
op->r_mapped = 1;
op->op_mapped = 1;
}
if (!op->r_write) {
if (!op->op_write) {
/* Alloc space on the send queue for the fastreg */
work_alloc = rds_iw_ring_alloc(&ic->i_send_ring, 1, &fr_pos);
if (work_alloc != 1) {
......@@ -835,7 +836,7 @@ int rds_iw_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
* Instead of knowing how to return a partial rdma read/write we insist that there
* be enough work requests to send the entire message.
*/
i = ceil(op->r_count, rds_iwdev->max_sge);
i = ceil(op->op_count, rds_iwdev->max_sge);
work_alloc = rds_iw_ring_alloc(&ic->i_send_ring, i, &pos);
if (work_alloc != i) {
......@@ -846,17 +847,17 @@ int rds_iw_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
}
send = &ic->i_sends[pos];
if (!op->r_write) {
if (!op->op_write) {
first = prev = &ic->i_sends[fr_pos];
} else {
first = send;
prev = NULL;
}
scat = &op->r_sg[0];
scat = &op->op_sg[0];
sent = 0;
num_sge = op->r_count;
num_sge = op->op_count;
for (i = 0; i < work_alloc && scat != &op->r_sg[op->r_count]; i++) {
for (i = 0; i < work_alloc && scat != &op->op_sg[op->op_count]; i++) {
send->s_wr.send_flags = 0;
send->s_queued = jiffies;
......@@ -873,13 +874,13 @@ int rds_iw_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
* for local access after RDS is finished with it, using
* IB_WR_RDMA_READ_WITH_INV will invalidate it after the read has completed.
*/
if (op->r_write)
if (op->op_write)
send->s_wr.opcode = IB_WR_RDMA_WRITE;
else
send->s_wr.opcode = IB_WR_RDMA_READ_WITH_INV;
send->s_wr.wr.rdma.remote_addr = remote_addr;
send->s_wr.wr.rdma.rkey = op->r_key;
send->s_wr.wr.rdma.rkey = op->op_rkey;
send->s_op = op;
if (num_sge > rds_iwdev->max_sge) {
......@@ -893,7 +894,7 @@ int rds_iw_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
if (prev)
prev->s_wr.next = &send->s_wr;
for (j = 0; j < send->s_wr.num_sge && scat != &op->r_sg[op->r_count]; j++) {
for (j = 0; j < send->s_wr.num_sge && scat != &op->op_sg[op->op_count]; j++) {
len = ib_sg_dma_len(ic->i_cm_id->device, scat);
if (send->s_wr.opcode == IB_WR_RDMA_READ_WITH_INV)
......@@ -927,7 +928,7 @@ int rds_iw_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
}
/* if we finished the message then send completion owns it */
if (scat == &op->r_sg[op->r_count])
if (scat == &op->op_sg[op->op_count])
first->s_wr.send_flags = IB_SEND_SIGNALED;
if (i < work_alloc) {
......@@ -941,9 +942,9 @@ int rds_iw_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
* adapters do not allow using the lkey for this at all. To bypass this use a
* fastreg_mr (or possibly a dma_mr)
*/
if (!op->r_write) {
if (!op->op_write) {
rds_iw_build_send_fastreg(rds_iwdev, ic, &ic->i_sends[fr_pos],
op->r_count, sent, conn->c_xmit_rm->m_rs->rs_user_addr);
op->op_count, sent, conn->c_xmit_rm->m_rs->rs_user_addr);
work_alloc++;
}
......
......@@ -122,10 +122,10 @@ void rds_iw_sysctl_exit(void)
unregister_sysctl_table(rds_iw_sysctl_hdr);
}
int __init rds_iw_sysctl_init(void)
int rds_iw_sysctl_init(void)
{
rds_iw_sysctl_hdr = register_sysctl_paths(rds_iw_sysctl_path, rds_iw_sysctl_table);
if (rds_iw_sysctl_hdr == NULL)
if (!rds_iw_sysctl_hdr)
return -ENOMEM;
return 0;
}
......@@ -61,10 +61,17 @@ static int rds_loop_xmit(struct rds_connection *conn, struct rds_message *rm,
unsigned int hdr_off, unsigned int sg,
unsigned int off)
{
/* Do not send cong updates to loopback */
if (rm->m_inc.i_hdr.h_flags & RDS_FLAG_CONG_BITMAP) {
rds_cong_map_updated(conn->c_fcong, ~(u64) 0);
return sizeof(struct rds_header) + RDS_CONG_MAP_BYTES;
}
BUG_ON(hdr_off || sg || off);
rds_inc_init(&rm->m_inc, conn, conn->c_laddr);
rds_message_addref(rm); /* for the inc */
/* For the embedded inc. Matching put is in loop_inc_free() */
rds_message_addref(rm);
rds_recv_incoming(conn, conn->c_laddr, conn->c_faddr, &rm->m_inc,
GFP_KERNEL, KM_USER0);
......@@ -77,16 +84,14 @@ static int rds_loop_xmit(struct rds_connection *conn, struct rds_message *rm,
return sizeof(struct rds_header) + be32_to_cpu(rm->m_inc.i_hdr.h_len);
}
static int rds_loop_xmit_cong_map(struct rds_connection *conn,
struct rds_cong_map *map,
unsigned long offset)
/*
* See rds_loop_xmit(). Since our inc is embedded in the rm, we
* make sure the rm lives at least until the inc is done.
*/
static void rds_loop_inc_free(struct rds_incoming *inc)
{
BUG_ON(offset);
BUG_ON(map != conn->c_lcong);
rds_cong_map_updated(conn->c_fcong, ~(u64) 0);
return sizeof(struct rds_header) + RDS_CONG_MAP_BYTES;
struct rds_message *rm = container_of(inc, struct rds_message, m_inc);
rds_message_put(rm);
}
/* we need to at least give the thread something to succeed */
......@@ -112,7 +117,7 @@ static int rds_loop_conn_alloc(struct rds_connection *conn, gfp_t gfp)
unsigned long flags;
lc = kzalloc(sizeof(struct rds_loop_connection), GFP_KERNEL);
if (lc == NULL)
if (!lc)
return -ENOMEM;
INIT_LIST_HEAD(&lc->loop_node);
......@@ -169,14 +174,12 @@ void rds_loop_exit(void)
*/
struct rds_transport rds_loop_transport = {
.xmit = rds_loop_xmit,
.xmit_cong_map = rds_loop_xmit_cong_map,
.recv = rds_loop_recv,
.conn_alloc = rds_loop_conn_alloc,
.conn_free = rds_loop_conn_free,
.conn_connect = rds_loop_conn_connect,
.conn_shutdown = rds_loop_conn_shutdown,
.inc_copy_to_user = rds_message_inc_copy_to_user,
.inc_purge = rds_message_inc_purge,
.inc_free = rds_message_inc_free,
.inc_free = rds_loop_inc_free,
.t_name = "loopback",
};
......@@ -34,9 +34,6 @@
#include <linux/slab.h>
#include "rds.h"
#include "rdma.h"
static DECLARE_WAIT_QUEUE_HEAD(rds_message_flush_waitq);
static unsigned int rds_exthdr_size[__RDS_EXTHDR_MAX] = {
[RDS_EXTHDR_NONE] = 0,
......@@ -63,29 +60,31 @@ static void rds_message_purge(struct rds_message *rm)
if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags)))
return;
for (i = 0; i < rm->m_nents; i++) {
rdsdebug("putting data page %p\n", (void *)sg_page(&rm->m_sg[i]));
for (i = 0; i < rm->data.op_nents; i++) {
rdsdebug("putting data page %p\n", (void *)sg_page(&rm->data.op_sg[i]));
/* XXX will have to put_page for page refs */
__free_page(sg_page(&rm->m_sg[i]));
__free_page(sg_page(&rm->data.op_sg[i]));
}
rm->m_nents = 0;
rm->data.op_nents = 0;
if (rm->m_rdma_op)
rds_rdma_free_op(rm->m_rdma_op);
if (rm->m_rdma_mr)
rds_mr_put(rm->m_rdma_mr);
}
if (rm->rdma.op_active)
rds_rdma_free_op(&rm->rdma);
if (rm->rdma.op_rdma_mr)
rds_mr_put(rm->rdma.op_rdma_mr);
void rds_message_inc_purge(struct rds_incoming *inc)
{
struct rds_message *rm = container_of(inc, struct rds_message, m_inc);
rds_message_purge(rm);
if (rm->atomic.op_active)
rds_atomic_free_op(&rm->atomic);
if (rm->atomic.op_rdma_mr)
rds_mr_put(rm->atomic.op_rdma_mr);
}
void rds_message_put(struct rds_message *rm)
{
rdsdebug("put rm %p ref %d\n", rm, atomic_read(&rm->m_refcount));
if (atomic_read(&rm->m_refcount) == 0) {
printk(KERN_CRIT "danger refcount zero on %p\n", rm);
WARN_ON(1);
}
if (atomic_dec_and_test(&rm->m_refcount)) {
BUG_ON(!list_empty(&rm->m_sock_item));
BUG_ON(!list_empty(&rm->m_conn_item));
......@@ -96,12 +95,6 @@ void rds_message_put(struct rds_message *rm)
}
EXPORT_SYMBOL_GPL(rds_message_put);
void rds_message_inc_free(struct rds_incoming *inc)
{
struct rds_message *rm = container_of(inc, struct rds_message, m_inc);
rds_message_put(rm);
}
void rds_message_populate_header(struct rds_header *hdr, __be16 sport,
__be16 dport, u64 seq)
{
......@@ -214,41 +207,68 @@ int rds_message_add_rdma_dest_extension(struct rds_header *hdr, u32 r_key, u32 o
}
EXPORT_SYMBOL_GPL(rds_message_add_rdma_dest_extension);
struct rds_message *rds_message_alloc(unsigned int nents, gfp_t gfp)
/*
* Each rds_message is allocated with extra space for the scatterlist entries
* rds ops will need. This is to minimize memory allocation count. Then, each rds op
* can grab SGs when initializing its part of the rds_message.
*/
struct rds_message *rds_message_alloc(unsigned int extra_len, gfp_t gfp)
{
struct rds_message *rm;
rm = kzalloc(sizeof(struct rds_message) +
(nents * sizeof(struct scatterlist)), gfp);
rm = kzalloc(sizeof(struct rds_message) + extra_len, gfp);
if (!rm)
goto out;
if (nents)
sg_init_table(rm->m_sg, nents);
rm->m_used_sgs = 0;
rm->m_total_sgs = extra_len / sizeof(struct scatterlist);
atomic_set(&rm->m_refcount, 1);
INIT_LIST_HEAD(&rm->m_sock_item);
INIT_LIST_HEAD(&rm->m_conn_item);
spin_lock_init(&rm->m_rs_lock);
init_waitqueue_head(&rm->m_flush_wait);
out:
return rm;
}
/*
* RDS ops use this to grab SG entries from the rm's sg pool.
*/
struct scatterlist *rds_message_alloc_sgs(struct rds_message *rm, int nents)
{
struct scatterlist *sg_first = (struct scatterlist *) &rm[1];
struct scatterlist *sg_ret;
WARN_ON(rm->m_used_sgs + nents > rm->m_total_sgs);
WARN_ON(!nents);
sg_ret = &sg_first[rm->m_used_sgs];
sg_init_table(sg_ret, nents);
rm->m_used_sgs += nents;
return sg_ret;
}
struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned int total_len)
{
struct rds_message *rm;
unsigned int i;
int num_sgs = ceil(total_len, PAGE_SIZE);
int extra_bytes = num_sgs * sizeof(struct scatterlist);
rm = rds_message_alloc(ceil(total_len, PAGE_SIZE), GFP_KERNEL);
if (rm == NULL)
rm = rds_message_alloc(extra_bytes, GFP_NOWAIT);
if (!rm)
return ERR_PTR(-ENOMEM);
set_bit(RDS_MSG_PAGEVEC, &rm->m_flags);
rm->m_inc.i_hdr.h_len = cpu_to_be32(total_len);
rm->m_nents = ceil(total_len, PAGE_SIZE);
rm->data.op_nents = ceil(total_len, PAGE_SIZE);
rm->data.op_sg = rds_message_alloc_sgs(rm, num_sgs);
for (i = 0; i < rm->m_nents; ++i) {
sg_set_page(&rm->m_sg[i],
for (i = 0; i < rm->data.op_nents; ++i) {
sg_set_page(&rm->data.op_sg[i],
virt_to_page(page_addrs[i]),
PAGE_SIZE, 0);
}
......@@ -256,40 +276,33 @@ struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned in
return rm;
}
struct rds_message *rds_message_copy_from_user(struct iovec *first_iov,
int rds_message_copy_from_user(struct rds_message *rm, struct iovec *first_iov,
size_t total_len)
{
unsigned long to_copy;
unsigned long iov_off;
unsigned long sg_off;
struct rds_message *rm;
struct iovec *iov;
struct scatterlist *sg;
int ret;
rm = rds_message_alloc(ceil(total_len, PAGE_SIZE), GFP_KERNEL);
if (rm == NULL) {
ret = -ENOMEM;
goto out;
}
int ret = 0;
rm->m_inc.i_hdr.h_len = cpu_to_be32(total_len);
/*
* now allocate and copy in the data payload.
*/
sg = rm->m_sg;
sg = rm->data.op_sg;
iov = first_iov;
iov_off = 0;
sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */
while (total_len) {
if (sg_page(sg) == NULL) {
if (!sg_page(sg)) {
ret = rds_page_remainder_alloc(sg, total_len,
GFP_HIGHUSER);
if (ret)
goto out;
rm->m_nents++;
rm->data.op_nents++;
sg_off = 0;
}
......@@ -320,14 +333,8 @@ struct rds_message *rds_message_copy_from_user(struct iovec *first_iov,
sg++;
}
ret = 0;
out:
if (ret) {
if (rm)
rds_message_put(rm);
rm = ERR_PTR(ret);
}
return rm;
return ret;
}
int rds_message_inc_copy_to_user(struct rds_incoming *inc,
......@@ -348,7 +355,7 @@ int rds_message_inc_copy_to_user(struct rds_incoming *inc,
iov = first_iov;
iov_off = 0;
sg = rm->m_sg;
sg = rm->data.op_sg;
vec_off = 0;
copied = 0;
......@@ -394,15 +401,14 @@ int rds_message_inc_copy_to_user(struct rds_incoming *inc,
*/
void rds_message_wait(struct rds_message *rm)
{
wait_event(rds_message_flush_waitq,
wait_event_interruptible(rm->m_flush_wait,
!test_bit(RDS_MSG_MAPPED, &rm->m_flags));
}
void rds_message_unmapped(struct rds_message *rm)
{
clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
if (waitqueue_active(&rds_message_flush_waitq))
wake_up(&rds_message_flush_waitq);
wake_up_interruptible(&rm->m_flush_wait);
}
EXPORT_SYMBOL_GPL(rds_message_unmapped);
......@@ -116,7 +116,7 @@ int rds_page_remainder_alloc(struct scatterlist *scat, unsigned long bytes,
/* jump straight to allocation if we're trying for a huge page */
if (bytes >= PAGE_SIZE) {
page = alloc_page(gfp);
if (page == NULL) {
if (!page) {
ret = -ENOMEM;
} else {
sg_set_page(scat, page, PAGE_SIZE, 0);
......@@ -162,7 +162,7 @@ int rds_page_remainder_alloc(struct scatterlist *scat, unsigned long bytes,
rem = &per_cpu(rds_page_remainders, get_cpu());
local_irq_save(flags);
if (page == NULL) {
if (!page) {
ret = -ENOMEM;
break;
}
......@@ -186,6 +186,7 @@ int rds_page_remainder_alloc(struct scatterlist *scat, unsigned long bytes,
ret ? 0 : scat->length);
return ret;
}
EXPORT_SYMBOL_GPL(rds_page_remainder_alloc);
static int rds_page_remainder_cpu_notify(struct notifier_block *self,
unsigned long action, void *hcpu)
......
此差异已折叠。
#ifndef _RDS_RDMA_H
#define _RDS_RDMA_H
#include <linux/rbtree.h>
#include <linux/spinlock.h>
#include <linux/scatterlist.h>
#include "rds.h"
struct rds_mr {
struct rb_node r_rb_node;
atomic_t r_refcount;
u32 r_key;
/* A copy of the creation flags */
unsigned int r_use_once:1;
unsigned int r_invalidate:1;
unsigned int r_write:1;
/* This is for RDS_MR_DEAD.
* It would be nice & consistent to make this part of the above
* bit field here, but we need to use test_and_set_bit.
*/
unsigned long r_state;
struct rds_sock *r_sock; /* back pointer to the socket that owns us */
struct rds_transport *r_trans;
void *r_trans_private;
};
/* Flags for mr->r_state */
#define RDS_MR_DEAD 0
struct rds_rdma_op {
u32 r_key;
u64 r_remote_addr;
unsigned int r_write:1;
unsigned int r_fence:1;
unsigned int r_notify:1;
unsigned int r_recverr:1;
unsigned int r_mapped:1;
struct rds_notifier *r_notifier;
unsigned int r_bytes;
unsigned int r_nents;
unsigned int r_count;
struct scatterlist r_sg[0];
};
static inline rds_rdma_cookie_t rds_rdma_make_cookie(u32 r_key, u32 offset)
{
return r_key | (((u64) offset) << 32);
}
static inline u32 rds_rdma_cookie_key(rds_rdma_cookie_t cookie)
{
return cookie;
}
static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie)
{
return cookie >> 32;
}
int rds_get_mr(struct rds_sock *rs, char __user *optval, int optlen);
int rds_get_mr_for_dest(struct rds_sock *rs, char __user *optval, int optlen);
int rds_free_mr(struct rds_sock *rs, char __user *optval, int optlen);
void rds_rdma_drop_keys(struct rds_sock *rs);
int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
struct cmsghdr *cmsg);
int rds_cmsg_rdma_dest(struct rds_sock *rs, struct rds_message *rm,
struct cmsghdr *cmsg);
int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
struct cmsghdr *cmsg);
int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm,
struct cmsghdr *cmsg);
void rds_rdma_free_op(struct rds_rdma_op *ro);
void rds_rdma_send_complete(struct rds_message *rm, int);
extern void __rds_put_mr_final(struct rds_mr *mr);
static inline void rds_mr_put(struct rds_mr *mr)
{
if (atomic_dec_and_test(&mr->r_refcount))
__rds_put_mr_final(mr);
}
#endif
......@@ -36,6 +36,34 @@
static struct rdma_cm_id *rds_rdma_listen_id;
static char *rds_cm_event_strings[] = {
#define RDS_CM_EVENT_STRING(foo) \
[RDMA_CM_EVENT_##foo] = __stringify(RDMA_CM_EVENT_##foo)
RDS_CM_EVENT_STRING(ADDR_RESOLVED),
RDS_CM_EVENT_STRING(ADDR_ERROR),
RDS_CM_EVENT_STRING(ROUTE_RESOLVED),
RDS_CM_EVENT_STRING(ROUTE_ERROR),
RDS_CM_EVENT_STRING(CONNECT_REQUEST),
RDS_CM_EVENT_STRING(CONNECT_RESPONSE),
RDS_CM_EVENT_STRING(CONNECT_ERROR),
RDS_CM_EVENT_STRING(UNREACHABLE),
RDS_CM_EVENT_STRING(REJECTED),
RDS_CM_EVENT_STRING(ESTABLISHED),
RDS_CM_EVENT_STRING(DISCONNECTED),
RDS_CM_EVENT_STRING(DEVICE_REMOVAL),
RDS_CM_EVENT_STRING(MULTICAST_JOIN),
RDS_CM_EVENT_STRING(MULTICAST_ERROR),
RDS_CM_EVENT_STRING(ADDR_CHANGE),
RDS_CM_EVENT_STRING(TIMEWAIT_EXIT),
#undef RDS_CM_EVENT_STRING
};
static char *rds_cm_event_str(enum rdma_cm_event_type type)
{
return rds_str_array(rds_cm_event_strings,
ARRAY_SIZE(rds_cm_event_strings), type);
};
int rds_rdma_cm_event_handler(struct rdma_cm_id *cm_id,
struct rdma_cm_event *event)
{
......@@ -44,8 +72,8 @@ int rds_rdma_cm_event_handler(struct rdma_cm_id *cm_id,
struct rds_transport *trans;
int ret = 0;
rdsdebug("conn %p id %p handling event %u\n", conn, cm_id,
event->event);
rdsdebug("conn %p id %p handling event %u (%s)\n", conn, cm_id,
event->event, rds_cm_event_str(event->event));
if (cm_id->device->node_type == RDMA_NODE_RNIC)
trans = &rds_iw_transport;
......@@ -109,7 +137,8 @@ int rds_rdma_cm_event_handler(struct rdma_cm_id *cm_id,
default:
/* things like device disconnect? */
printk(KERN_ERR "RDS: unknown event %u!\n", event->event);
printk(KERN_ERR "RDS: unknown event %u (%s)!\n",
event->event, rds_cm_event_str(event->event));
break;
}
......@@ -117,12 +146,13 @@ int rds_rdma_cm_event_handler(struct rdma_cm_id *cm_id,
if (conn)
mutex_unlock(&conn->c_cm_lock);
rdsdebug("id %p event %u handling ret %d\n", cm_id, event->event, ret);
rdsdebug("id %p event %u (%s) handling ret %d\n", cm_id, event->event,
rds_cm_event_str(event->event), ret);
return ret;
}
static int __init rds_rdma_listen_init(void)
static int rds_rdma_listen_init(void)
{
struct sockaddr_in sin;
struct rdma_cm_id *cm_id;
......@@ -177,7 +207,7 @@ static void rds_rdma_listen_stop(void)
}
}
int __init rds_rdma_init(void)
int rds_rdma_init(void)
{
int ret;
......
此差异已折叠。
......@@ -36,7 +36,6 @@
#include <linux/in.h>
#include "rds.h"
#include "rdma.h"
void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn,
__be32 saddr)
......@@ -210,7 +209,7 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
}
rs = rds_find_bound(daddr, inc->i_hdr.h_dport);
if (rs == NULL) {
if (!rs) {
rds_stats_inc(s_recv_drop_no_sock);
goto out;
}
......@@ -251,7 +250,7 @@ static int rds_next_incoming(struct rds_sock *rs, struct rds_incoming **inc)
{
unsigned long flags;
if (*inc == NULL) {
if (!*inc) {
read_lock_irqsave(&rs->rs_recv_lock, flags);
if (!list_empty(&rs->rs_recv_queue)) {
*inc = list_entry(rs->rs_recv_queue.next,
......@@ -334,10 +333,10 @@ int rds_notify_queue_get(struct rds_sock *rs, struct msghdr *msghdr)
if (msghdr) {
cmsg.user_token = notifier->n_user_token;
cmsg.status = notifier->n_status;
cmsg.status = notifier->n_status;
err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_RDMA_STATUS,
sizeof(cmsg), &cmsg);
sizeof(cmsg), &cmsg);
if (err)
break;
}
......
此差异已折叠。
......@@ -57,8 +57,8 @@ static const char *const rds_stat_names[] = {
"recv_ping",
"send_queue_empty",
"send_queue_full",
"send_sem_contention",
"send_sem_queue_raced",
"send_lock_contention",
"send_lock_queue_raced",
"send_immediate_retry",
"send_delayed_retry",
"send_drop_acked",
......@@ -143,7 +143,7 @@ void rds_stats_exit(void)
rds_info_deregister_func(RDS_INFO_COUNTERS, rds_stats_info);
}
int __init rds_stats_init(void)
int rds_stats_init(void)
{
rds_info_register_func(RDS_INFO_COUNTERS, rds_stats_info);
return 0;
......
......@@ -105,13 +105,13 @@ void rds_sysctl_exit(void)
unregister_sysctl_table(rds_sysctl_reg_table);
}
int __init rds_sysctl_init(void)
int rds_sysctl_init(void)
{
rds_sysctl_reconnect_min = msecs_to_jiffies(1);
rds_sysctl_reconnect_min_jiffies = rds_sysctl_reconnect_min;
rds_sysctl_reg_table = register_sysctl_paths(rds_sysctl_path, rds_sysctl_rds_table);
if (rds_sysctl_reg_table == NULL)
if (!rds_sysctl_reg_table)
return -ENOMEM;
return 0;
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册