提交 eed889b1 编写于 作者: T Trond Myklebust

Merge tag 'nfs-rdma-for-4.3' of git://git.linux-nfs.org/projects/anna/nfs-rdma

NFS: NFS over RDMA Client Side Changes

These patches improve both client performance and scalability, most notably
by increasing the maixmum allowed rsize and wsize and by increasing the number
of RDMA "credits".  There are also several bugfixes, such as correcting how
WRITE compounds are encoded and fixing large NFS symlink operations.
Signed-off-by: NAnna Schumaker <Anna.Schumaker@Netapp.com>
......@@ -1144,73 +1144,6 @@ struct ib_mr *ib_get_dma_mr(struct ib_pd *pd, int mr_access_flags)
}
EXPORT_SYMBOL(ib_get_dma_mr);
struct ib_mr *ib_reg_phys_mr(struct ib_pd *pd,
struct ib_phys_buf *phys_buf_array,
int num_phys_buf,
int mr_access_flags,
u64 *iova_start)
{
struct ib_mr *mr;
int err;
err = ib_check_mr_access(mr_access_flags);
if (err)
return ERR_PTR(err);
if (!pd->device->reg_phys_mr)
return ERR_PTR(-ENOSYS);
mr = pd->device->reg_phys_mr(pd, phys_buf_array, num_phys_buf,
mr_access_flags, iova_start);
if (!IS_ERR(mr)) {
mr->device = pd->device;
mr->pd = pd;
mr->uobject = NULL;
atomic_inc(&pd->usecnt);
atomic_set(&mr->usecnt, 0);
}
return mr;
}
EXPORT_SYMBOL(ib_reg_phys_mr);
int ib_rereg_phys_mr(struct ib_mr *mr,
int mr_rereg_mask,
struct ib_pd *pd,
struct ib_phys_buf *phys_buf_array,
int num_phys_buf,
int mr_access_flags,
u64 *iova_start)
{
struct ib_pd *old_pd;
int ret;
ret = ib_check_mr_access(mr_access_flags);
if (ret)
return ret;
if (!mr->device->rereg_phys_mr)
return -ENOSYS;
if (atomic_read(&mr->usecnt))
return -EBUSY;
old_pd = mr->pd;
ret = mr->device->rereg_phys_mr(mr, mr_rereg_mask, pd,
phys_buf_array, num_phys_buf,
mr_access_flags, iova_start);
if (!ret && (mr_rereg_mask & IB_MR_REREG_PD)) {
atomic_dec(&old_pd->usecnt);
atomic_inc(&pd->usecnt);
}
return ret;
}
EXPORT_SYMBOL(ib_rereg_phys_mr);
int ib_query_mr(struct ib_mr *mr, struct ib_mr_attr *mr_attr)
{
return mr->device->query_mr ?
......
......@@ -1103,6 +1103,7 @@ static void nfs3_xdr_enc_symlink3args(struct rpc_rqst *req,
{
encode_diropargs3(xdr, args->fromfh, args->fromname, args->fromlen);
encode_symlinkdata3(xdr, args);
xdr->buf->flags |= XDRBUF_WRITE;
}
/*
......
......@@ -1154,7 +1154,9 @@ static void encode_create(struct xdr_stream *xdr, const struct nfs4_create_arg *
case NF4LNK:
p = reserve_space(xdr, 4);
*p = cpu_to_be32(create->u.symlink.len);
xdr_write_pages(xdr, create->u.symlink.pages, 0, create->u.symlink.len);
xdr_write_pages(xdr, create->u.symlink.pages, 0,
create->u.symlink.len);
xdr->buf->flags |= XDRBUF_WRITE;
break;
case NF4BLK: case NF4CHR:
......
......@@ -49,7 +49,7 @@
* a single chunk type per message is supported currently.
*/
#define RPCRDMA_MIN_SLOT_TABLE (2U)
#define RPCRDMA_DEF_SLOT_TABLE (32U)
#define RPCRDMA_DEF_SLOT_TABLE (128U)
#define RPCRDMA_MAX_SLOT_TABLE (256U)
#define RPCRDMA_DEF_INLINE (1024) /* default inline max */
......
......@@ -2759,52 +2759,6 @@ static inline void ib_dma_free_coherent(struct ib_device *dev,
dma_free_coherent(dev->dma_device, size, cpu_addr, dma_handle);
}
/**
* ib_reg_phys_mr - Prepares a virtually addressed memory region for use
* by an HCA.
* @pd: The protection domain associated assigned to the registered region.
* @phys_buf_array: Specifies a list of physical buffers to use in the
* memory region.
* @num_phys_buf: Specifies the size of the phys_buf_array.
* @mr_access_flags: Specifies the memory access rights.
* @iova_start: The offset of the region's starting I/O virtual address.
*/
struct ib_mr *ib_reg_phys_mr(struct ib_pd *pd,
struct ib_phys_buf *phys_buf_array,
int num_phys_buf,
int mr_access_flags,
u64 *iova_start);
/**
* ib_rereg_phys_mr - Modifies the attributes of an existing memory region.
* Conceptually, this call performs the functions deregister memory region
* followed by register physical memory region. Where possible,
* resources are reused instead of deallocated and reallocated.
* @mr: The memory region to modify.
* @mr_rereg_mask: A bit-mask used to indicate which of the following
* properties of the memory region are being modified.
* @pd: If %IB_MR_REREG_PD is set in mr_rereg_mask, this field specifies
* the new protection domain to associated with the memory region,
* otherwise, this parameter is ignored.
* @phys_buf_array: If %IB_MR_REREG_TRANS is set in mr_rereg_mask, this
* field specifies a list of physical buffers to use in the new
* translation, otherwise, this parameter is ignored.
* @num_phys_buf: If %IB_MR_REREG_TRANS is set in mr_rereg_mask, this
* field specifies the size of the phys_buf_array, otherwise, this
* parameter is ignored.
* @mr_access_flags: If %IB_MR_REREG_ACCESS is set in mr_rereg_mask, this
* field specifies the new memory access rights, otherwise, this
* parameter is ignored.
* @iova_start: The offset of the region's starting I/O virtual address.
*/
int ib_rereg_phys_mr(struct ib_mr *mr,
int mr_rereg_mask,
struct ib_pd *pd,
struct ib_phys_buf *phys_buf_array,
int num_phys_buf,
int mr_access_flags,
u64 *iova_start);
/**
* ib_query_mr - Retrieves information about a specific memory region.
* @mr: The memory region to retrieve information about.
......
......@@ -39,6 +39,25 @@ static int
fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
struct rpcrdma_create_data_internal *cdata)
{
struct ib_device_attr *devattr = &ia->ri_devattr;
struct ib_mr *mr;
/* Obtain an lkey to use for the regbufs, which are
* protected from remote access.
*/
if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
ia->ri_dma_lkey = ia->ri_device->local_dma_lkey;
} else {
mr = ib_get_dma_mr(ia->ri_pd, IB_ACCESS_LOCAL_WRITE);
if (IS_ERR(mr)) {
pr_err("%s: ib_get_dma_mr for failed with %lX\n",
__func__, PTR_ERR(mr));
return -ENOMEM;
}
ia->ri_dma_lkey = ia->ri_dma_mr->lkey;
ia->ri_dma_mr = mr;
}
return 0;
}
......
......@@ -189,6 +189,11 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
struct ib_device_attr *devattr = &ia->ri_devattr;
int depth, delta;
/* Obtain an lkey to use for the regbufs, which are
* protected from remote access.
*/
ia->ri_dma_lkey = ia->ri_device->local_dma_lkey;
ia->ri_max_frmr_depth =
min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
devattr->max_fast_reg_page_list_len);
......
......@@ -23,6 +23,29 @@ static int
physical_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
struct rpcrdma_create_data_internal *cdata)
{
struct ib_device_attr *devattr = &ia->ri_devattr;
struct ib_mr *mr;
/* Obtain an rkey to use for RPC data payloads.
*/
mr = ib_get_dma_mr(ia->ri_pd,
IB_ACCESS_LOCAL_WRITE |
IB_ACCESS_REMOTE_WRITE |
IB_ACCESS_REMOTE_READ);
if (IS_ERR(mr)) {
pr_err("%s: ib_get_dma_mr for failed with %lX\n",
__func__, PTR_ERR(mr));
return -ENOMEM;
}
ia->ri_dma_mr = mr;
/* Obtain an lkey to use for regbufs.
*/
if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY)
ia->ri_dma_lkey = ia->ri_device->local_dma_lkey;
else
ia->ri_dma_lkey = ia->ri_dma_mr->lkey;
return 0;
}
......@@ -51,7 +74,7 @@ physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
rpcrdma_map_one(ia->ri_device, seg, rpcrdma_data_dir(writing));
seg->mr_rkey = ia->ri_bind_mem->rkey;
seg->mr_rkey = ia->ri_dma_mr->rkey;
seg->mr_base = seg->mr_dma;
seg->mr_nsegs = 1;
return 1;
......
......@@ -71,6 +71,67 @@ static const char transfertypes[][12] = {
};
#endif
/* The client can send a request inline as long as the RPCRDMA header
* plus the RPC call fit under the transport's inline limit. If the
* combined call message size exceeds that limit, the client must use
* the read chunk list for this operation.
*/
static bool rpcrdma_args_inline(struct rpc_rqst *rqst)
{
unsigned int callsize = RPCRDMA_HDRLEN_MIN + rqst->rq_snd_buf.len;
return callsize <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
}
/* The client can't know how large the actual reply will be. Thus it
* plans for the largest possible reply for that particular ULP
* operation. If the maximum combined reply message size exceeds that
* limit, the client must provide a write list or a reply chunk for
* this request.
*/
static bool rpcrdma_results_inline(struct rpc_rqst *rqst)
{
unsigned int repsize = RPCRDMA_HDRLEN_MIN + rqst->rq_rcv_buf.buflen;
return repsize <= RPCRDMA_INLINE_READ_THRESHOLD(rqst);
}
static int
rpcrdma_tail_pullup(struct xdr_buf *buf)
{
size_t tlen = buf->tail[0].iov_len;
size_t skip = tlen & 3;
/* Do not include the tail if it is only an XDR pad */
if (tlen < 4)
return 0;
/* xdr_write_pages() adds a pad at the beginning of the tail
* if the content in "buf->pages" is unaligned. Force the
* tail's actual content to land at the next XDR position
* after the head instead.
*/
if (skip) {
unsigned char *src, *dst;
unsigned int count;
src = buf->tail[0].iov_base;
dst = buf->head[0].iov_base;
dst += buf->head[0].iov_len;
src += skip;
tlen -= skip;
dprintk("RPC: %s: skip=%zu, memmove(%p, %p, %zu)\n",
__func__, skip, dst, src, tlen);
for (count = tlen; count; count--)
*dst++ = *src++;
}
return tlen;
}
/*
* Chunk assembly from upper layer xdr_buf.
*
......@@ -122,6 +183,10 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
if (len && n == nsegs)
return -EIO;
/* When encoding the read list, the tail is always sent inline */
if (type == rpcrdma_readch)
return n;
if (xdrbuf->tail[0].iov_len) {
/* the rpcrdma protocol allows us to omit any trailing
* xdr pad bytes, saving the server an RDMA operation. */
......@@ -297,8 +362,7 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
* pre-registered memory buffer for this request. For small amounts
* of data, this is efficient. The cutoff value is tunable.
*/
static int
rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
static void rpcrdma_inline_pullup(struct rpc_rqst *rqst)
{
int i, npages, curlen;
int copy_len;
......@@ -310,16 +374,9 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
destp = rqst->rq_svec[0].iov_base;
curlen = rqst->rq_svec[0].iov_len;
destp += curlen;
/*
* Do optional padding where it makes sense. Alignment of write
* payload can help the server, if our setting is accurate.
*/
pad -= (curlen + 36/*sizeof(struct rpcrdma_msg_padded)*/);
if (pad < 0 || rqst->rq_slen - curlen < RPCRDMA_INLINE_PAD_THRESH)
pad = 0; /* don't pad this request */
dprintk("RPC: %s: pad %d destp 0x%p len %d hdrlen %d\n",
__func__, pad, destp, rqst->rq_slen, curlen);
dprintk("RPC: %s: destp 0x%p len %d hdrlen %d\n",
__func__, destp, rqst->rq_slen, curlen);
copy_len = rqst->rq_snd_buf.page_len;
......@@ -355,7 +412,6 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
page_base = 0;
}
/* header now contains entire send message */
return pad;
}
/*
......@@ -380,7 +436,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
char *base;
size_t rpclen, padlen;
size_t rpclen;
ssize_t hdrlen;
enum rpcrdma_chunktype rtype, wtype;
struct rpcrdma_msg *headerp;
......@@ -402,28 +458,15 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
/*
* Chunks needed for results?
*
* o Read ops return data as write chunk(s), header as inline.
* o If the expected result is under the inline threshold, all ops
* return as inline (but see later).
* return as inline.
* o Large non-read ops return as a single reply chunk.
* o Large read ops return data as write chunk(s), header as inline.
*
* Note: the NFS code sending down multiple result segments implies
* the op is one of read, readdir[plus], readlink or NFSv4 getacl.
*/
/*
* This code can handle read chunks, write chunks OR reply
* chunks -- only one type. If the request is too big to fit
* inline, then we will choose read chunks. If the request is
* a READ, then use write chunks to separate the file data
* into pages; otherwise use reply chunks.
*/
if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst))
wtype = rpcrdma_noch;
else if (rqst->rq_rcv_buf.page_len == 0)
wtype = rpcrdma_replych;
else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
wtype = rpcrdma_writech;
else if (rpcrdma_results_inline(rqst))
wtype = rpcrdma_noch;
else
wtype = rpcrdma_replych;
......@@ -432,21 +475,25 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
*
* o If the total request is under the inline threshold, all ops
* are sent as inline.
* o Large non-write ops are sent with the entire message as a
* single read chunk (protocol 0-position special case).
* o Large write ops transmit data as read chunk(s), header as
* inline.
* o Large non-write ops are sent with the entire message as a
* single read chunk (protocol 0-position special case).
*
* Note: the NFS code sending down multiple argument segments
* implies the op is a write.
* TBD check NFSv4 setacl
* This assumes that the upper layer does not present a request
* that both has a data payload, and whose non-data arguments
* by themselves are larger than the inline threshold.
*/
if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
if (rpcrdma_args_inline(rqst)) {
rtype = rpcrdma_noch;
else if (rqst->rq_snd_buf.page_len == 0)
rtype = rpcrdma_areadch;
else
} else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
rtype = rpcrdma_readch;
} else {
r_xprt->rx_stats.nomsg_call_count++;
headerp->rm_type = htonl(RDMA_NOMSG);
rtype = rpcrdma_areadch;
rpclen = 0;
}
/* The following simplification is not true forever */
if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
......@@ -458,7 +505,6 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
}
hdrlen = RPCRDMA_HDRLEN_MIN;
padlen = 0;
/*
* Pull up any extra send data into the preregistered buffer.
......@@ -467,45 +513,15 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
*/
if (rtype == rpcrdma_noch) {
padlen = rpcrdma_inline_pullup(rqst,
RPCRDMA_INLINE_PAD_VALUE(rqst));
if (padlen) {
headerp->rm_type = rdma_msgp;
headerp->rm_body.rm_padded.rm_align =
cpu_to_be32(RPCRDMA_INLINE_PAD_VALUE(rqst));
headerp->rm_body.rm_padded.rm_thresh =
cpu_to_be32(RPCRDMA_INLINE_PAD_THRESH);
headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero;
headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
hdrlen += 2 * sizeof(u32); /* extra words in padhdr */
if (wtype != rpcrdma_noch) {
dprintk("RPC: %s: invalid chunk list\n",
__func__);
return -EIO;
}
} else {
headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
/* new length after pullup */
rpclen = rqst->rq_svec[0].iov_len;
/*
* Currently we try to not actually use read inline.
* Reply chunks have the desirable property that
* they land, packed, directly in the target buffers
* without headers, so they require no fixup. The
* additional RDMA Write op sends the same amount
* of data, streams on-the-wire and adds no overhead
* on receive. Therefore, we request a reply chunk
* for non-writes wherever feasible and efficient.
*/
if (wtype == rpcrdma_noch)
wtype = rpcrdma_replych;
}
}
rpcrdma_inline_pullup(rqst);
headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
/* new length after pullup */
rpclen = rqst->rq_svec[0].iov_len;
} else if (rtype == rpcrdma_readch)
rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
if (rtype != rpcrdma_noch) {
hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf,
headerp, rtype);
......@@ -518,9 +534,9 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
if (hdrlen < 0)
return hdrlen;
dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd padlen %zd"
dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd"
" headerp 0x%p base 0x%p lkey 0x%x\n",
__func__, transfertypes[wtype], hdrlen, rpclen, padlen,
__func__, transfertypes[wtype], hdrlen, rpclen,
headerp, base, rdmab_lkey(req->rl_rdmabuf));
/*
......@@ -534,26 +550,15 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
req->rl_send_iov[0].length = hdrlen;
req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
req->rl_niovs = 1;
if (rtype == rpcrdma_areadch)
return 0;
req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
req->rl_send_iov[1].length = rpclen;
req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
req->rl_niovs = 2;
if (padlen) {
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
req->rl_send_iov[2].addr = rdmab_addr(ep->rep_padbuf);
req->rl_send_iov[2].length = padlen;
req->rl_send_iov[2].lkey = rdmab_lkey(ep->rep_padbuf);
req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen;
req->rl_send_iov[3].length = rqst->rq_slen - rpclen;
req->rl_send_iov[3].lkey = rdmab_lkey(req->rl_sendbuf);
req->rl_niovs = 4;
}
return 0;
}
......
......@@ -175,10 +175,8 @@ xprt_rdma_format_addresses6(struct rpc_xprt *xprt, struct sockaddr *sap)
}
static void
xprt_rdma_format_addresses(struct rpc_xprt *xprt)
xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap)
{
struct sockaddr *sap = (struct sockaddr *)
&rpcx_to_rdmad(xprt).addr;
char buf[128];
switch (sap->sa_family) {
......@@ -302,7 +300,7 @@ xprt_setup_rdma(struct xprt_create *args)
struct rpc_xprt *xprt;
struct rpcrdma_xprt *new_xprt;
struct rpcrdma_ep *new_ep;
struct sockaddr_in *sin;
struct sockaddr *sap;
int rc;
if (args->addrlen > sizeof(xprt->addr)) {
......@@ -333,26 +331,20 @@ xprt_setup_rdma(struct xprt_create *args)
* Set up RDMA-specific connect data.
*/
/* Put server RDMA address in local cdata */
memcpy(&cdata.addr, args->dstaddr, args->addrlen);
sap = (struct sockaddr *)&cdata.addr;
memcpy(sap, args->dstaddr, args->addrlen);
/* Ensure xprt->addr holds valid server TCP (not RDMA)
* address, for any side protocols which peek at it */
xprt->prot = IPPROTO_TCP;
xprt->addrlen = args->addrlen;
memcpy(&xprt->addr, &cdata.addr, xprt->addrlen);
memcpy(&xprt->addr, sap, xprt->addrlen);
sin = (struct sockaddr_in *)&cdata.addr;
if (ntohs(sin->sin_port) != 0)
if (rpc_get_port(sap))
xprt_set_bound(xprt);
dprintk("RPC: %s: %pI4:%u\n",
__func__, &sin->sin_addr.s_addr, ntohs(sin->sin_port));
/* Set max requests */
cdata.max_requests = xprt->max_reqs;
/* Set some length limits */
cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */
cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */
......@@ -375,8 +367,7 @@ xprt_setup_rdma(struct xprt_create *args)
new_xprt = rpcx_to_rdmax(xprt);
rc = rpcrdma_ia_open(new_xprt, (struct sockaddr *) &cdata.addr,
xprt_rdma_memreg_strategy);
rc = rpcrdma_ia_open(new_xprt, sap, xprt_rdma_memreg_strategy);
if (rc)
goto out1;
......@@ -409,7 +400,7 @@ xprt_setup_rdma(struct xprt_create *args)
INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
xprt_rdma_connect_worker);
xprt_rdma_format_addresses(xprt);
xprt_rdma_format_addresses(xprt, sap);
xprt->max_payload = new_xprt->rx_ia.ri_ops->ro_maxpages(new_xprt);
if (xprt->max_payload == 0)
goto out4;
......@@ -420,6 +411,9 @@ xprt_setup_rdma(struct xprt_create *args)
if (!try_module_get(THIS_MODULE))
goto out4;
dprintk("RPC: %s: %s:%s\n", __func__,
xprt->address_strings[RPC_DISPLAY_ADDR],
xprt->address_strings[RPC_DISPLAY_PORT]);
return xprt;
out4:
......@@ -653,31 +647,30 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
if (xprt_connected(xprt))
idle_time = (long)(jiffies - xprt->last_used) / HZ;
seq_printf(seq,
"\txprt:\trdma %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu "
"%lu %lu %lu %Lu %Lu %Lu %Lu %lu %lu %lu\n",
0, /* need a local port? */
xprt->stat.bind_count,
xprt->stat.connect_count,
xprt->stat.connect_time,
idle_time,
xprt->stat.sends,
xprt->stat.recvs,
xprt->stat.bad_xids,
xprt->stat.req_u,
xprt->stat.bklog_u,
r_xprt->rx_stats.read_chunk_count,
r_xprt->rx_stats.write_chunk_count,
r_xprt->rx_stats.reply_chunk_count,
r_xprt->rx_stats.total_rdma_request,
r_xprt->rx_stats.total_rdma_reply,
r_xprt->rx_stats.pullup_copy_count,
r_xprt->rx_stats.fixup_copy_count,
r_xprt->rx_stats.hardway_register_count,
r_xprt->rx_stats.failed_marshal_count,
r_xprt->rx_stats.bad_reply_count);
seq_puts(seq, "\txprt:\trdma ");
seq_printf(seq, "%u %lu %lu %lu %ld %lu %lu %lu %llu %llu ",
0, /* need a local port? */
xprt->stat.bind_count,
xprt->stat.connect_count,
xprt->stat.connect_time,
idle_time,
xprt->stat.sends,
xprt->stat.recvs,
xprt->stat.bad_xids,
xprt->stat.req_u,
xprt->stat.bklog_u);
seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu\n",
r_xprt->rx_stats.read_chunk_count,
r_xprt->rx_stats.write_chunk_count,
r_xprt->rx_stats.reply_chunk_count,
r_xprt->rx_stats.total_rdma_request,
r_xprt->rx_stats.total_rdma_reply,
r_xprt->rx_stats.pullup_copy_count,
r_xprt->rx_stats.fixup_copy_count,
r_xprt->rx_stats.hardway_register_count,
r_xprt->rx_stats.failed_marshal_count,
r_xprt->rx_stats.bad_reply_count,
r_xprt->rx_stats.nomsg_call_count);
}
static int
......
......@@ -52,6 +52,7 @@
#include <linux/prefetch.h>
#include <linux/sunrpc/addr.h>
#include <asm/bitops.h>
#include <linux/module.h> /* try_module_get()/module_put() */
#include "xprt_rdma.h"
......@@ -414,6 +415,14 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
return 0;
}
static void rpcrdma_destroy_id(struct rdma_cm_id *id)
{
if (id) {
module_put(id->device->owner);
rdma_destroy_id(id);
}
}
static struct rdma_cm_id *
rpcrdma_create_id(struct rpcrdma_xprt *xprt,
struct rpcrdma_ia *ia, struct sockaddr *addr)
......@@ -440,6 +449,17 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt,
}
wait_for_completion_interruptible_timeout(&ia->ri_done,
msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
/* FIXME:
* Until xprtrdma supports DEVICE_REMOVAL, the provider must
* be pinned while there are active NFS/RDMA mounts to prevent
* hangs and crashes at umount time.
*/
if (!ia->ri_async_rc && !try_module_get(id->device->owner)) {
dprintk("RPC: %s: Failed to get device module\n",
__func__);
ia->ri_async_rc = -ENODEV;
}
rc = ia->ri_async_rc;
if (rc)
goto out;
......@@ -449,16 +469,17 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt,
if (rc) {
dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
__func__, rc);
goto out;
goto put;
}
wait_for_completion_interruptible_timeout(&ia->ri_done,
msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
rc = ia->ri_async_rc;
if (rc)
goto out;
goto put;
return id;
put:
module_put(id->device->owner);
out:
rdma_destroy_id(id);
return ERR_PTR(rc);
......@@ -493,9 +514,11 @@ rpcrdma_clean_cq(struct ib_cq *cq)
int
rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
{
int rc, mem_priv;
struct rpcrdma_ia *ia = &xprt->rx_ia;
struct ib_device_attr *devattr = &ia->ri_devattr;
int rc;
ia->ri_dma_mr = NULL;
ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
if (IS_ERR(ia->ri_id)) {
......@@ -519,11 +542,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
goto out3;
}
if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
ia->ri_have_dma_lkey = 1;
ia->ri_dma_lkey = ia->ri_device->local_dma_lkey;
}
if (memreg == RPCRDMA_FRMR) {
/* Requires both frmr reg and local dma lkey */
if (((devattr->device_cap_flags &
......@@ -539,42 +557,19 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
if (!ia->ri_device->alloc_fmr) {
dprintk("RPC: %s: MTHCAFMR registration "
"not supported by HCA\n", __func__);
memreg = RPCRDMA_ALLPHYSICAL;
goto out3;
}
}
/*
* Optionally obtain an underlying physical identity mapping in
* order to do a memory window-based bind. This base registration
* is protected from remote access - that is enabled only by binding
* for the specific bytes targeted during each RPC operation, and
* revoked after the corresponding completion similar to a storage
* adapter.
*/
switch (memreg) {
case RPCRDMA_FRMR:
ia->ri_ops = &rpcrdma_frwr_memreg_ops;
break;
case RPCRDMA_ALLPHYSICAL:
ia->ri_ops = &rpcrdma_physical_memreg_ops;
mem_priv = IB_ACCESS_LOCAL_WRITE |
IB_ACCESS_REMOTE_WRITE |
IB_ACCESS_REMOTE_READ;
goto register_setup;
break;
case RPCRDMA_MTHCAFMR:
ia->ri_ops = &rpcrdma_fmr_memreg_ops;
if (ia->ri_have_dma_lkey)
break;
mem_priv = IB_ACCESS_LOCAL_WRITE;
register_setup:
ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
if (IS_ERR(ia->ri_bind_mem)) {
printk(KERN_ALERT "%s: ib_get_dma_mr for "
"phys register failed with %lX\n",
__func__, PTR_ERR(ia->ri_bind_mem));
rc = -ENOMEM;
goto out3;
}
break;
default:
printk(KERN_ERR "RPC: Unsupported memory "
......@@ -592,7 +587,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
ib_dealloc_pd(ia->ri_pd);
ia->ri_pd = NULL;
out2:
rdma_destroy_id(ia->ri_id);
rpcrdma_destroy_id(ia->ri_id);
ia->ri_id = NULL;
out1:
return rc;
......@@ -606,19 +601,11 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
void
rpcrdma_ia_close(struct rpcrdma_ia *ia)
{
int rc;
dprintk("RPC: %s: entering\n", __func__);
if (ia->ri_bind_mem != NULL) {
rc = ib_dereg_mr(ia->ri_bind_mem);
dprintk("RPC: %s: ib_dereg_mr returned %i\n",
__func__, rc);
}
if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
if (ia->ri_id->qp)
rdma_destroy_qp(ia->ri_id);
rdma_destroy_id(ia->ri_id);
rpcrdma_destroy_id(ia->ri_id);
ia->ri_id = NULL;
}
......@@ -639,6 +626,12 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
struct ib_cq_init_attr cq_attr = {};
int rc, err;
if (devattr->max_sge < RPCRDMA_MAX_IOVS) {
dprintk("RPC: %s: insufficient sge's available\n",
__func__);
return -ENOMEM;
}
/* check provider's send/recv wr limits */
if (cdata->max_requests > devattr->max_qp_wr)
cdata->max_requests = devattr->max_qp_wr;
......@@ -651,21 +644,13 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
if (rc)
return rc;
ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
ep->rep_attr.cap.max_recv_sge = 1;
ep->rep_attr.cap.max_inline_data = 0;
ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
ep->rep_attr.qp_type = IB_QPT_RC;
ep->rep_attr.port_num = ~0;
if (cdata->padding) {
ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding,
GFP_KERNEL);
if (IS_ERR(ep->rep_padbuf))
return PTR_ERR(ep->rep_padbuf);
} else
ep->rep_padbuf = NULL;
dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
"iovs: send %d recv %d\n",
__func__,
......@@ -748,7 +733,8 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
dprintk("RPC: %s: ib_destroy_cq returned %i\n",
__func__, err);
out1:
rpcrdma_free_regbuf(ia, ep->rep_padbuf);
if (ia->ri_dma_mr)
ib_dereg_mr(ia->ri_dma_mr);
return rc;
}
......@@ -775,8 +761,6 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
ia->ri_id->qp = NULL;
}
rpcrdma_free_regbuf(ia, ep->rep_padbuf);
rpcrdma_clean_cq(ep->rep_attr.recv_cq);
rc = ib_destroy_cq(ep->rep_attr.recv_cq);
if (rc)
......@@ -788,6 +772,12 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
if (rc)
dprintk("RPC: %s: ib_destroy_cq returned %i\n",
__func__, rc);
if (ia->ri_dma_mr) {
rc = ib_dereg_mr(ia->ri_dma_mr);
dprintk("RPC: %s: ib_dereg_mr returned %i\n",
__func__, rc);
}
}
/*
......@@ -825,7 +815,7 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
if (ia->ri_device != id->device) {
printk("RPC: %s: can't reconnect on "
"different device!\n", __func__);
rdma_destroy_id(id);
rpcrdma_destroy_id(id);
rc = -ENETUNREACH;
goto out;
}
......@@ -834,7 +824,7 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
if (rc) {
dprintk("RPC: %s: rdma_create_qp failed %i\n",
__func__, rc);
rdma_destroy_id(id);
rpcrdma_destroy_id(id);
rc = -ENETUNREACH;
goto out;
}
......@@ -845,7 +835,7 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
write_unlock(&ia->ri_qplock);
rdma_destroy_qp(old);
rdma_destroy_id(old);
rpcrdma_destroy_id(old);
} else {
dprintk("RPC: %s: connecting...\n", __func__);
rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
......@@ -1229,75 +1219,6 @@ rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
(unsigned long long)seg->mr_dma, seg->mr_dmalen);
}
static int
rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
struct ib_mr **mrp, struct ib_sge *iov)
{
struct ib_phys_buf ipb;
struct ib_mr *mr;
int rc;
/*
* All memory passed here was kmalloc'ed, therefore phys-contiguous.
*/
iov->addr = ib_dma_map_single(ia->ri_device,
va, len, DMA_BIDIRECTIONAL);
if (ib_dma_mapping_error(ia->ri_device, iov->addr))
return -ENOMEM;
iov->length = len;
if (ia->ri_have_dma_lkey) {
*mrp = NULL;
iov->lkey = ia->ri_dma_lkey;
return 0;
} else if (ia->ri_bind_mem != NULL) {
*mrp = NULL;
iov->lkey = ia->ri_bind_mem->lkey;
return 0;
}
ipb.addr = iov->addr;
ipb.size = iov->length;
mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
IB_ACCESS_LOCAL_WRITE, &iov->addr);
dprintk("RPC: %s: phys convert: 0x%llx "
"registered 0x%llx length %d\n",
__func__, (unsigned long long)ipb.addr,
(unsigned long long)iov->addr, len);
if (IS_ERR(mr)) {
*mrp = NULL;
rc = PTR_ERR(mr);
dprintk("RPC: %s: failed with %i\n", __func__, rc);
} else {
*mrp = mr;
iov->lkey = mr->lkey;
rc = 0;
}
return rc;
}
static int
rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
struct ib_mr *mr, struct ib_sge *iov)
{
int rc;
ib_dma_unmap_single(ia->ri_device,
iov->addr, iov->length, DMA_BIDIRECTIONAL);
if (NULL == mr)
return 0;
rc = ib_dereg_mr(mr);
if (rc)
dprintk("RPC: %s: ib_dereg_mr failed %i\n", __func__, rc);
return rc;
}
/**
* rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
* @ia: controlling rpcrdma_ia
......@@ -1317,26 +1238,29 @@ struct rpcrdma_regbuf *
rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
{
struct rpcrdma_regbuf *rb;
int rc;
struct ib_sge *iov;
rc = -ENOMEM;
rb = kmalloc(sizeof(*rb) + size, flags);
if (rb == NULL)
goto out;
rb->rg_size = size;
rb->rg_owner = NULL;
rc = rpcrdma_register_internal(ia, rb->rg_base, size,
&rb->rg_mr, &rb->rg_iov);
if (rc)
iov = &rb->rg_iov;
iov->addr = ib_dma_map_single(ia->ri_device,
(void *)rb->rg_base, size,
DMA_BIDIRECTIONAL);
if (ib_dma_mapping_error(ia->ri_device, iov->addr))
goto out_free;
iov->length = size;
iov->lkey = ia->ri_dma_lkey;
rb->rg_size = size;
rb->rg_owner = NULL;
return rb;
out_free:
kfree(rb);
out:
return ERR_PTR(rc);
return ERR_PTR(-ENOMEM);
}
/**
......@@ -1347,10 +1271,15 @@ rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
void
rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
{
if (rb) {
rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov);
kfree(rb);
}
struct ib_sge *iov;
if (!rb)
return;
iov = &rb->rg_iov;
ib_dma_unmap_single(ia->ri_device,
iov->addr, iov->length, DMA_BIDIRECTIONAL);
kfree(rb);
}
/*
......@@ -1363,9 +1292,11 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
struct rpcrdma_ep *ep,
struct rpcrdma_req *req)
{
struct ib_device *device = ia->ri_device;
struct ib_send_wr send_wr, *send_wr_fail;
struct rpcrdma_rep *rep = req->rl_reply;
int rc;
struct ib_sge *iov = req->rl_send_iov;
int i, rc;
if (rep) {
rc = rpcrdma_ep_post_recv(ia, ep, rep);
......@@ -1376,22 +1307,15 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
send_wr.next = NULL;
send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION;
send_wr.sg_list = req->rl_send_iov;
send_wr.sg_list = iov;
send_wr.num_sge = req->rl_niovs;
send_wr.opcode = IB_WR_SEND;
if (send_wr.num_sge == 4) /* no need to sync any pad (constant) */
ib_dma_sync_single_for_device(ia->ri_device,
req->rl_send_iov[3].addr,
req->rl_send_iov[3].length,
DMA_TO_DEVICE);
ib_dma_sync_single_for_device(ia->ri_device,
req->rl_send_iov[1].addr,
req->rl_send_iov[1].length,
DMA_TO_DEVICE);
ib_dma_sync_single_for_device(ia->ri_device,
req->rl_send_iov[0].addr,
req->rl_send_iov[0].length,
DMA_TO_DEVICE);
for (i = 0; i < send_wr.num_sge; i++)
ib_dma_sync_single_for_device(device, iov[i].addr,
iov[i].length, DMA_TO_DEVICE);
dprintk("RPC: %s: posting %d s/g entries\n",
__func__, send_wr.num_sge);
if (DECR_CQCOUNT(ep) > 0)
send_wr.send_flags = 0;
......
......@@ -65,9 +65,8 @@ struct rpcrdma_ia {
struct ib_device *ri_device;
struct rdma_cm_id *ri_id;
struct ib_pd *ri_pd;
struct ib_mr *ri_bind_mem;
struct ib_mr *ri_dma_mr;
u32 ri_dma_lkey;
int ri_have_dma_lkey;
struct completion ri_done;
int ri_async_rc;
unsigned int ri_max_frmr_depth;
......@@ -89,7 +88,6 @@ struct rpcrdma_ep {
int rep_connected;
struct ib_qp_init_attr rep_attr;
wait_queue_head_t rep_connect_wait;
struct rpcrdma_regbuf *rep_padbuf;
struct rdma_conn_param rep_remote_cma;
struct sockaddr_storage rep_remote_addr;
struct delayed_work rep_connect_worker;
......@@ -119,7 +117,6 @@ struct rpcrdma_ep {
struct rpcrdma_regbuf {
size_t rg_size;
struct rpcrdma_req *rg_owner;
struct ib_mr *rg_mr;
struct ib_sge rg_iov;
__be32 rg_base[0] __attribute__ ((aligned(256)));
};
......@@ -165,8 +162,7 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
* struct rpcrdma_buffer. N is the max number of outstanding requests.
*/
/* temporary static scatter/gather max */
#define RPCRDMA_MAX_DATA_SEGS (64) /* max scatter/gather */
#define RPCRDMA_MAX_DATA_SEGS ((1 * 1024 * 1024) / PAGE_SIZE)
#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */
struct rpcrdma_buffer;
......@@ -258,16 +254,18 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
char *mr_offset; /* kva if no page, else offset */
};
#define RPCRDMA_MAX_IOVS (2)
struct rpcrdma_req {
unsigned int rl_niovs; /* 0, 2 or 4 */
unsigned int rl_nchunks; /* non-zero if chunks */
unsigned int rl_connect_cookie; /* retry detection */
struct rpcrdma_buffer *rl_buffer; /* home base for this structure */
unsigned int rl_niovs;
unsigned int rl_nchunks;
unsigned int rl_connect_cookie;
struct rpcrdma_buffer *rl_buffer;
struct rpcrdma_rep *rl_reply;/* holder for reply buffer */
struct ib_sge rl_send_iov[4]; /* for active requests */
struct rpcrdma_regbuf *rl_rdmabuf;
struct rpcrdma_regbuf *rl_sendbuf;
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS];
struct rpcrdma_regbuf *rl_rdmabuf;
struct rpcrdma_regbuf *rl_sendbuf;
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
};
static inline struct rpcrdma_req *
......@@ -342,6 +340,7 @@ struct rpcrdma_stats {
unsigned long hardway_register_count;
unsigned long failed_marshal_count;
unsigned long bad_reply_count;
unsigned long nomsg_call_count;
};
/*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册