diff --git a/fs/nfs/callback_xdr.c b/fs/nfs/callback_xdr.c
index d81f96aacd51e71b1da710b477e7d44ff9a90b24..656f68f7fe53e110a33f8bb876028e6083c3db20 100644
--- a/fs/nfs/callback_xdr.c
+++ b/fs/nfs/callback_xdr.c
@@ -925,7 +925,7 @@ static __be32 nfs4_callback_compound(struct svc_rqst *rqstp, void *argp, void *r
 	if (hdr_arg.minorversion == 0) {
 		cps.clp = nfs4_find_client_ident(SVC_NET(rqstp), hdr_arg.cb_ident);
 		if (!cps.clp || !check_gss_callback_principal(cps.clp, rqstp))
-			return rpc_drop_reply;
+			goto out_invalidcred;
 	}
 
 	cps.minorversion = hdr_arg.minorversion;
@@ -953,6 +953,10 @@ static __be32 nfs4_callback_compound(struct svc_rqst *rqstp, void *argp, void *r
 	nfs_put_client(cps.clp);
 	dprintk("%s: done, status = %u\n", __func__, ntohl(status));
 	return rpc_success;
+
+out_invalidcred:
+	pr_warn_ratelimited("NFS: NFSv4 callback contains invalid cred\n");
+	return rpc_autherr_badcred;
 }
 
 /*
diff --git a/include/linux/sunrpc/auth.h b/include/linux/sunrpc/auth.h
index 899791573a403ba8434c12a05b4594f23ac6c33b..3a40287b4d27ccb83455fa1f36533989bc569f90 100644
--- a/include/linux/sunrpc/auth.h
+++ b/include/linux/sunrpc/auth.h
@@ -107,6 +107,9 @@ struct rpc_auth {
 	/* per-flavor data */
 };
 
+/* rpc_auth au_flags */
+#define RPCAUTH_AUTH_DATATOUCH	0x00000002
+
 struct rpc_auth_create_args {
 	rpc_authflavor_t pseudoflavor;
 	const char *target_name;
diff --git a/include/linux/sunrpc/gss_api.h b/include/linux/sunrpc/gss_api.h
index 1f911ccb2a75655bee357d27076b455776d4d377..68ec78c1aa48e1ec3e4b285420a69732a2b05c6a 100644
--- a/include/linux/sunrpc/gss_api.h
+++ b/include/linux/sunrpc/gss_api.h
@@ -73,6 +73,7 @@ u32 gss_delete_sec_context(
 rpc_authflavor_t gss_svc_to_pseudoflavor(struct gss_api_mech *, u32 qop,
 					u32 service);
 u32 gss_pseudoflavor_to_service(struct gss_api_mech *, u32 pseudoflavor);
+bool gss_pseudoflavor_to_datatouch(struct gss_api_mech *, u32 pseudoflavor);
 char *gss_service_to_auth_domain_name(struct gss_api_mech *, u32 service);
 
 struct pf_desc {
@@ -81,6 +82,7 @@ struct pf_desc {
 	u32	service;
 	char	*name;
 	char	*auth_domain_name;
+	bool	datatouch;
 };
 
 /* Different mechanisms (e.g., krb5 or spkm3) may implement gss-api, and
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index e64ae93d5b4f618e216f2c55070ceef60bd737d5..bca3537efffdf39cd7bdf50bf4469d70d0576d9c 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -1017,6 +1017,8 @@ gss_create_new(struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
 	auth->au_rslack = GSS_VERF_SLACK >> 2;
 	auth->au_ops = &authgss_ops;
 	auth->au_flavor = flavor;
+	if (gss_pseudoflavor_to_datatouch(gss_auth->mech, flavor))
+		auth->au_flags |= RPCAUTH_AUTH_DATATOUCH;
 	atomic_set(&auth->au_count, 1);
 	kref_init(&gss_auth->kref);
 
diff --git a/net/sunrpc/auth_gss/gss_krb5_mech.c b/net/sunrpc/auth_gss/gss_krb5_mech.c
index 65427492b1c95f681f79345e7ce29c62d941e147..60595835317afbdd1c2ccc657cd727ed11477a6b 100644
--- a/net/sunrpc/auth_gss/gss_krb5_mech.c
+++ b/net/sunrpc/auth_gss/gss_krb5_mech.c
@@ -745,12 +745,14 @@ static struct pf_desc gss_kerberos_pfs[] = {
 		.qop = GSS_C_QOP_DEFAULT,
 		.service = RPC_GSS_SVC_INTEGRITY,
 		.name = "krb5i",
+		.datatouch = true,
 	},
 	[2] = {
 		.pseudoflavor = RPC_AUTH_GSS_KRB5P,
 		.qop = GSS_C_QOP_DEFAULT,
 		.service = RPC_GSS_SVC_PRIVACY,
 		.name = "krb5p",
+		.datatouch = true,
 	},
 };
 
diff --git a/net/sunrpc/auth_gss/gss_mech_switch.c b/net/sunrpc/auth_gss/gss_mech_switch.c
index 7063d856a598e3c4f1201c5df26ce86c6158538d..5fec3abbe19bb640de31bf85bfc9becdeab5f359 100644
--- a/net/sunrpc/auth_gss/gss_mech_switch.c
+++ b/net/sunrpc/auth_gss/gss_mech_switch.c
@@ -361,6 +361,18 @@ gss_pseudoflavor_to_service(struct gss_api_mech *gm, u32 pseudoflavor)
 }
 EXPORT_SYMBOL(gss_pseudoflavor_to_service);
 
+bool
+gss_pseudoflavor_to_datatouch(struct gss_api_mech *gm, u32 pseudoflavor)
+{
+	int i;
+
+	for (i = 0; i < gm->gm_pf_num; i++) {
+		if (gm->gm_pfs[i].pseudoflavor == pseudoflavor)
+			return gm->gm_pfs[i].datatouch;
+	}
+	return false;
+}
+
 char *
 gss_service_to_auth_domain_name(struct gss_api_mech *gm, u32 service)
 {
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index cc9852897395c6f6db42653c773788446a9c5d3e..c5b0cb4f4056c4da0a0adc556cf46bebb48d2e7a 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1188,11 +1188,17 @@ svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
 		*statp = procp->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp);
 
 		/* Encode reply */
-		if (test_bit(RQ_DROPME, &rqstp->rq_flags)) {
+		if (*statp == rpc_drop_reply ||
+		    test_bit(RQ_DROPME, &rqstp->rq_flags)) {
 			if (procp->pc_release)
 				procp->pc_release(rqstp, NULL, rqstp->rq_resp);
 			goto dropit;
 		}
+		if (*statp == rpc_autherr_badcred) {
+			if (procp->pc_release)
+				procp->pc_release(rqstp, NULL, rqstp->rq_resp);
+			goto err_bad_auth;
+		}
 		if (*statp == rpc_success &&
 		    (xdr = procp->pc_encode) &&
 		    !xdr(rqstp, resv->iov_base+resv->iov_len, rqstp->rq_resp)) {
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index dc9f3b513a05f3a2fc9027359f9e1e587f9d243b..ef19fa42c50ff2e15ecdee7d87f1207ffc3c445c 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -1,7 +1,7 @@
 obj-$(CONFIG_SUNRPC_XPRT_RDMA) += rpcrdma.o
 
 rpcrdma-y := transport.o rpc_rdma.o verbs.o \
-	fmr_ops.o frwr_ops.o physical_ops.o \
+	fmr_ops.o frwr_ops.o \
 	svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \
 	svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
 	module.o
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index 6326ebe8b5951a95c3e488bee905f6fa6f00a62b..21cb3b150b371dc5e8aab5e02bc4af99a6a3d856 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -19,13 +19,6 @@
  * verb (fmr_op_unmap).
  */
 
-/* Transport recovery
- *
- * After a transport reconnect, fmr_op_map re-uses the MR already
- * allocated for the RPC, but generates a fresh rkey then maps the
- * MR again. This process is synchronous.
- */
-
 #include "xprt_rdma.h"
 
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
@@ -35,62 +28,132 @@
 /* Maximum scatter/gather per FMR */
 #define RPCRDMA_MAX_FMR_SGES	(64)
 
-static struct workqueue_struct *fmr_recovery_wq;
-
-#define FMR_RECOVERY_WQ_FLAGS		(WQ_UNBOUND)
+/* Access mode of externally registered pages */
+enum {
+	RPCRDMA_FMR_ACCESS_FLAGS	= IB_ACCESS_REMOTE_WRITE |
+					  IB_ACCESS_REMOTE_READ,
+};
 
-int
-fmr_alloc_recovery_wq(void)
+bool
+fmr_is_supported(struct rpcrdma_ia *ia)
 {
-	fmr_recovery_wq = alloc_workqueue("fmr_recovery", WQ_UNBOUND, 0);
-	return !fmr_recovery_wq ? -ENOMEM : 0;
+	if (!ia->ri_device->alloc_fmr) {
+		pr_info("rpcrdma: 'fmr' mode is not supported by device %s\n",
+			ia->ri_device->name);
+		return false;
+	}
+	return true;
 }
 
-void
-fmr_destroy_recovery_wq(void)
+static int
+fmr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *mw)
 {
-	struct workqueue_struct *wq;
+	static struct ib_fmr_attr fmr_attr = {
+		.max_pages	= RPCRDMA_MAX_FMR_SGES,
+		.max_maps	= 1,
+		.page_shift	= PAGE_SHIFT
+	};
 
-	if (!fmr_recovery_wq)
-		return;
+	mw->fmr.fm_physaddrs = kcalloc(RPCRDMA_MAX_FMR_SGES,
+				       sizeof(u64), GFP_KERNEL);
+	if (!mw->fmr.fm_physaddrs)
+		goto out_free;
 
-	wq = fmr_recovery_wq;
-	fmr_recovery_wq = NULL;
-	destroy_workqueue(wq);
+	mw->mw_sg = kcalloc(RPCRDMA_MAX_FMR_SGES,
+			    sizeof(*mw->mw_sg), GFP_KERNEL);
+	if (!mw->mw_sg)
+		goto out_free;
+
+	sg_init_table(mw->mw_sg, RPCRDMA_MAX_FMR_SGES);
+
+	mw->fmr.fm_mr = ib_alloc_fmr(ia->ri_pd, RPCRDMA_FMR_ACCESS_FLAGS,
+				     &fmr_attr);
+	if (IS_ERR(mw->fmr.fm_mr))
+		goto out_fmr_err;
+
+	return 0;
+
+out_fmr_err:
+	dprintk("RPC:       %s: ib_alloc_fmr returned %ld\n", __func__,
+		PTR_ERR(mw->fmr.fm_mr));
+
+out_free:
+	kfree(mw->mw_sg);
+	kfree(mw->fmr.fm_physaddrs);
+	return -ENOMEM;
 }
 
 static int
 __fmr_unmap(struct rpcrdma_mw *mw)
 {
 	LIST_HEAD(l);
+	int rc;
 
-	list_add(&mw->fmr.fmr->list, &l);
-	return ib_unmap_fmr(&l);
+	list_add(&mw->fmr.fm_mr->list, &l);
+	rc = ib_unmap_fmr(&l);
+	list_del_init(&mw->fmr.fm_mr->list);
+	return rc;
 }
 
-/* Deferred reset of a single FMR. Generate a fresh rkey by
- * replacing the MR. There's no recovery if this fails.
- */
 static void
-__fmr_recovery_worker(struct work_struct *work)
+fmr_op_release_mr(struct rpcrdma_mw *r)
 {
-	struct rpcrdma_mw *mw = container_of(work, struct rpcrdma_mw,
-					    mw_work);
-	struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
+	LIST_HEAD(unmap_list);
+	int rc;
 
-	__fmr_unmap(mw);
-	rpcrdma_put_mw(r_xprt, mw);
-	return;
+	/* Ensure MW is not on any rl_registered list */
+	if (!list_empty(&r->mw_list))
+		list_del(&r->mw_list);
+
+	kfree(r->fmr.fm_physaddrs);
+	kfree(r->mw_sg);
+
+	/* In case this one was left mapped, try to unmap it
+	 * to prevent dealloc_fmr from failing with EBUSY
+	 */
+	rc = __fmr_unmap(r);
+	if (rc)
+		pr_err("rpcrdma: final ib_unmap_fmr for %p failed %i\n",
+		       r, rc);
+
+	rc = ib_dealloc_fmr(r->fmr.fm_mr);
+	if (rc)
+		pr_err("rpcrdma: final ib_dealloc_fmr for %p returned %i\n",
+		       r, rc);
+
+	kfree(r);
 }
 
-/* A broken MR was discovered in a context that can't sleep.
- * Defer recovery to the recovery worker.
+/* Reset of a single FMR.
  */
 static void
-__fmr_queue_recovery(struct rpcrdma_mw *mw)
+fmr_op_recover_mr(struct rpcrdma_mw *mw)
 {
-	INIT_WORK(&mw->mw_work, __fmr_recovery_worker);
-	queue_work(fmr_recovery_wq, &mw->mw_work);
+	struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
+	int rc;
+
+	/* ORDER: invalidate first */
+	rc = __fmr_unmap(mw);
+
+	/* ORDER: then DMA unmap */
+	ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+			mw->mw_sg, mw->mw_nents, mw->mw_dir);
+	if (rc)
+		goto out_release;
+
+	rpcrdma_put_mw(r_xprt, mw);
+	r_xprt->rx_stats.mrs_recovered++;
+	return;
+
+out_release:
+	pr_err("rpcrdma: FMR reset failed (%d), %p released\n", rc, mw);
+	r_xprt->rx_stats.mrs_orphaned++;
+
+	spin_lock(&r_xprt->rx_buf.rb_mwlock);
+	list_del(&mw->mw_all);
+	spin_unlock(&r_xprt->rx_buf.rb_mwlock);
+
+	fmr_op_release_mr(mw);
 }
 
 static int
@@ -112,86 +175,21 @@ fmr_op_maxpages(struct rpcrdma_xprt *r_xprt)
 		     RPCRDMA_MAX_HDR_SEGS * RPCRDMA_MAX_FMR_SGES);
 }
 
-static int
-fmr_op_init(struct rpcrdma_xprt *r_xprt)
-{
-	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
-	int mr_access_flags = IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ;
-	struct ib_fmr_attr fmr_attr = {
-		.max_pages	= RPCRDMA_MAX_FMR_SGES,
-		.max_maps	= 1,
-		.page_shift	= PAGE_SHIFT
-	};
-	struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
-	struct rpcrdma_mw *r;
-	int i, rc;
-
-	spin_lock_init(&buf->rb_mwlock);
-	INIT_LIST_HEAD(&buf->rb_mws);
-	INIT_LIST_HEAD(&buf->rb_all);
-
-	i = max_t(int, RPCRDMA_MAX_DATA_SEGS / RPCRDMA_MAX_FMR_SGES, 1);
-	i += 2;				/* head + tail */
-	i *= buf->rb_max_requests;	/* one set for each RPC slot */
-	dprintk("RPC:       %s: initalizing %d FMRs\n", __func__, i);
-
-	rc = -ENOMEM;
-	while (i--) {
-		r = kzalloc(sizeof(*r), GFP_KERNEL);
-		if (!r)
-			goto out;
-
-		r->fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES *
-					   sizeof(u64), GFP_KERNEL);
-		if (!r->fmr.physaddrs)
-			goto out_free;
-
-		r->fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
-		if (IS_ERR(r->fmr.fmr))
-			goto out_fmr_err;
-
-		r->mw_xprt = r_xprt;
-		list_add(&r->mw_list, &buf->rb_mws);
-		list_add(&r->mw_all, &buf->rb_all);
-	}
-	return 0;
-
-out_fmr_err:
-	rc = PTR_ERR(r->fmr.fmr);
-	dprintk("RPC:       %s: ib_alloc_fmr status %i\n", __func__, rc);
-	kfree(r->fmr.physaddrs);
-out_free:
-	kfree(r);
-out:
-	return rc;
-}
-
 /* Use the ib_map_phys_fmr() verb to register a memory region
  * for remote access via RDMA READ or RDMA WRITE.
  */
 static int
 fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
-	   int nsegs, bool writing)
+	   int nsegs, bool writing, struct rpcrdma_mw **out)
 {
-	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-	struct ib_device *device = ia->ri_device;
-	enum dma_data_direction direction = rpcrdma_data_dir(writing);
 	struct rpcrdma_mr_seg *seg1 = seg;
 	int len, pageoff, i, rc;
 	struct rpcrdma_mw *mw;
+	u64 *dma_pages;
 
-	mw = seg1->rl_mw;
-	seg1->rl_mw = NULL;
-	if (!mw) {
-		mw = rpcrdma_get_mw(r_xprt);
-		if (!mw)
-			return -ENOMEM;
-	} else {
-		/* this is a retransmit; generate a fresh rkey */
-		rc = __fmr_unmap(mw);
-		if (rc)
-			return rc;
-	}
+	mw = rpcrdma_get_mw(r_xprt);
+	if (!mw)
+		return -ENOBUFS;
 
 	pageoff = offset_in_page(seg1->mr_offset);
 	seg1->mr_offset -= pageoff;	/* start of page */
@@ -200,8 +198,14 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 	if (nsegs > RPCRDMA_MAX_FMR_SGES)
 		nsegs = RPCRDMA_MAX_FMR_SGES;
 	for (i = 0; i < nsegs;) {
-		rpcrdma_map_one(device, seg, direction);
-		mw->fmr.physaddrs[i] = seg->mr_dma;
+		if (seg->mr_page)
+			sg_set_page(&mw->mw_sg[i],
+				    seg->mr_page,
+				    seg->mr_len,
+				    offset_in_page(seg->mr_offset));
+		else
+			sg_set_buf(&mw->mw_sg[i], seg->mr_offset,
+				   seg->mr_len);
 		len += seg->mr_len;
 		++seg;
 		++i;
@@ -210,49 +214,54 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
 			break;
 	}
-
-	rc = ib_map_phys_fmr(mw->fmr.fmr, mw->fmr.physaddrs,
-			     i, seg1->mr_dma);
+	mw->mw_nents = i;
+	mw->mw_dir = rpcrdma_data_dir(writing);
+	if (i == 0)
+		goto out_dmamap_err;
+
+	if (!ib_dma_map_sg(r_xprt->rx_ia.ri_device,
+			   mw->mw_sg, mw->mw_nents, mw->mw_dir))
+		goto out_dmamap_err;
+
+	for (i = 0, dma_pages = mw->fmr.fm_physaddrs; i < mw->mw_nents; i++)
+		dma_pages[i] = sg_dma_address(&mw->mw_sg[i]);
+	rc = ib_map_phys_fmr(mw->fmr.fm_mr, dma_pages, mw->mw_nents,
+			     dma_pages[0]);
 	if (rc)
 		goto out_maperr;
 
-	seg1->rl_mw = mw;
-	seg1->mr_rkey = mw->fmr.fmr->rkey;
-	seg1->mr_base = seg1->mr_dma + pageoff;
-	seg1->mr_nsegs = i;
-	seg1->mr_len = len;
-	return i;
+	mw->mw_handle = mw->fmr.fm_mr->rkey;
+	mw->mw_length = len;
+	mw->mw_offset = dma_pages[0] + pageoff;
 
-out_maperr:
-	dprintk("RPC:       %s: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n",
-		__func__, len, (unsigned long long)seg1->mr_dma,
-		pageoff, i, rc);
-	while (i--)
-		rpcrdma_unmap_one(device, --seg);
-	return rc;
-}
+	*out = mw;
+	return mw->mw_nents;
 
-static void
-__fmr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
-{
-	struct ib_device *device = r_xprt->rx_ia.ri_device;
-	int nsegs = seg->mr_nsegs;
+out_dmamap_err:
+	pr_err("rpcrdma: failed to dma map sg %p sg_nents %u\n",
+	       mw->mw_sg, mw->mw_nents);
+	rpcrdma_defer_mr_recovery(mw);
+	return -EIO;
 
-	while (nsegs--)
-		rpcrdma_unmap_one(device, seg++);
+out_maperr:
+	pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n",
+	       len, (unsigned long long)dma_pages[0],
+	       pageoff, mw->mw_nents, rc);
+	rpcrdma_defer_mr_recovery(mw);
+	return -EIO;
 }
 
 /* Invalidate all memory regions that were registered for "req".
  *
  * Sleeps until it is safe for the host CPU to access the
  * previously mapped memory regions.
+ *
+ * Caller ensures that req->rl_registered is not empty.
  */
 static void
 fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 {
-	struct rpcrdma_mr_seg *seg;
-	unsigned int i, nchunks;
-	struct rpcrdma_mw *mw;
+	struct rpcrdma_mw *mw, *tmp;
 	LIST_HEAD(unmap_list);
 	int rc;
 
@@ -261,90 +270,54 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 	/* ORDER: Invalidate all of the req's MRs first
 	 *
 	 * ib_unmap_fmr() is slow, so use a single call instead
-	 * of one call per mapped MR.
+	 * of one call per mapped FMR.
 	 */
-	for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
-		seg = &req->rl_segments[i];
-		mw = seg->rl_mw;
-
-		list_add(&mw->fmr.fmr->list, &unmap_list);
-
-		i += seg->mr_nsegs;
-	}
+	list_for_each_entry(mw, &req->rl_registered, mw_list)
+		list_add_tail(&mw->fmr.fm_mr->list, &unmap_list);
 	rc = ib_unmap_fmr(&unmap_list);
 	if (rc)
-		pr_warn("%s: ib_unmap_fmr failed (%i)\n", __func__, rc);
+		goto out_reset;
 
 	/* ORDER: Now DMA unmap all of the req's MRs, and return
 	 * them to the free MW list.
 	 */
-	for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
-		seg = &req->rl_segments[i];
+	list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) {
+		list_del_init(&mw->mw_list);
+		list_del_init(&mw->fmr.fm_mr->list);
+		ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+				mw->mw_sg, mw->mw_nents, mw->mw_dir);
+		rpcrdma_put_mw(r_xprt, mw);
+	}
 
-		__fmr_dma_unmap(r_xprt, seg);
-		rpcrdma_put_mw(r_xprt, seg->rl_mw);
+	return;
 
-		i += seg->mr_nsegs;
-		seg->mr_nsegs = 0;
-		seg->rl_mw = NULL;
-	}
+out_reset:
+	pr_err("rpcrdma: ib_unmap_fmr failed (%i)\n", rc);
 
-	req->rl_nchunks = 0;
+	list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) {
+		list_del_init(&mw->fmr.fm_mr->list);
+		fmr_op_recover_mr(mw);
+	}
 }
 
 /* Use a slow, safe mechanism to invalidate all memory regions
  * that were registered for "req".
- *
- * In the asynchronous case, DMA unmapping occurs first here
- * because the rpcrdma_mr_seg is released immediately after this
- * call. It's contents won't be available in __fmr_dma_unmap later.
- * FIXME.
  */
 static void
 fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
 		  bool sync)
 {
-	struct rpcrdma_mr_seg *seg;
 	struct rpcrdma_mw *mw;
-	unsigned int i;
-
-	for (i = 0; req->rl_nchunks; req->rl_nchunks--) {
-		seg = &req->rl_segments[i];
-		mw = seg->rl_mw;
-
-		if (sync) {
-			/* ORDER */
-			__fmr_unmap(mw);
-			__fmr_dma_unmap(r_xprt, seg);
-			rpcrdma_put_mw(r_xprt, mw);
-		} else {
-			__fmr_dma_unmap(r_xprt, seg);
-			__fmr_queue_recovery(mw);
-		}
-
-		i += seg->mr_nsegs;
-		seg->mr_nsegs = 0;
-		seg->rl_mw = NULL;
-	}
-}
-
-static void
-fmr_op_destroy(struct rpcrdma_buffer *buf)
-{
-	struct rpcrdma_mw *r;
-	int rc;
-
-	while (!list_empty(&buf->rb_all)) {
-		r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
-		list_del(&r->mw_all);
-		kfree(r->fmr.physaddrs);
 
-		rc = ib_dealloc_fmr(r->fmr.fmr);
-		if (rc)
-			dprintk("RPC:       %s: ib_dealloc_fmr failed %i\n",
-				__func__, rc);
+	while (!list_empty(&req->rl_registered)) {
+		mw = list_first_entry(&req->rl_registered,
+				      struct rpcrdma_mw, mw_list);
+		list_del_init(&mw->mw_list);
 
-		kfree(r);
+		if (sync)
+			fmr_op_recover_mr(mw);
+		else
+			rpcrdma_defer_mr_recovery(mw);
 	}
 }
 
@@ -352,9 +325,10 @@ const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
 	.ro_map				= fmr_op_map,
 	.ro_unmap_sync			= fmr_op_unmap_sync,
 	.ro_unmap_safe			= fmr_op_unmap_safe,
+	.ro_recover_mr			= fmr_op_recover_mr,
 	.ro_open			= fmr_op_open,
 	.ro_maxpages			= fmr_op_maxpages,
-	.ro_init			= fmr_op_init,
-	.ro_destroy			= fmr_op_destroy,
+	.ro_init_mr			= fmr_op_init_mr,
+	.ro_release_mr			= fmr_op_release_mr,
 	.ro_displayname			= "fmr",
 };
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index c0947544babeb976eea94ae4698ce955bd1a3c37..892b5e1d9b099b217fba65a26b47105a0c027c51 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -73,29 +73,71 @@
 # define RPCDBG_FACILITY	RPCDBG_TRANS
 #endif
 
-static struct workqueue_struct *frwr_recovery_wq;
-
-#define FRWR_RECOVERY_WQ_FLAGS		(WQ_UNBOUND | WQ_MEM_RECLAIM)
+bool
+frwr_is_supported(struct rpcrdma_ia *ia)
+{
+	struct ib_device_attr *attrs = &ia->ri_device->attrs;
+
+	if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
+		goto out_not_supported;
+	if (attrs->max_fast_reg_page_list_len == 0)
+		goto out_not_supported;
+	return true;
+
+out_not_supported:
+	pr_info("rpcrdma: 'frwr' mode is not supported by device %s\n",
+		ia->ri_device->name);
+	return false;
+}
 
-int
-frwr_alloc_recovery_wq(void)
+static int
+frwr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
 {
-	frwr_recovery_wq = alloc_workqueue("frwr_recovery",
-					   FRWR_RECOVERY_WQ_FLAGS, 0);
-	return !frwr_recovery_wq ? -ENOMEM : 0;
+	unsigned int depth = ia->ri_max_frmr_depth;
+	struct rpcrdma_frmr *f = &r->frmr;
+	int rc;
+
+	f->fr_mr = ib_alloc_mr(ia->ri_pd, IB_MR_TYPE_MEM_REG, depth);
+	if (IS_ERR(f->fr_mr))
+		goto out_mr_err;
+
+	r->mw_sg = kcalloc(depth, sizeof(*r->mw_sg), GFP_KERNEL);
+	if (!r->mw_sg)
+		goto out_list_err;
+
+	sg_init_table(r->mw_sg, depth);
+	init_completion(&f->fr_linv_done);
+	return 0;
+
+out_mr_err:
+	rc = PTR_ERR(f->fr_mr);
+	dprintk("RPC:       %s: ib_alloc_mr status %i\n",
+		__func__, rc);
+	return rc;
+
+out_list_err:
+	rc = -ENOMEM;
+	dprintk("RPC:       %s: sg allocation failure\n",
+		__func__);
+	ib_dereg_mr(f->fr_mr);
+	return rc;
 }
 
-void
-frwr_destroy_recovery_wq(void)
+static void
+frwr_op_release_mr(struct rpcrdma_mw *r)
 {
-	struct workqueue_struct *wq;
+	int rc;
 
-	if (!frwr_recovery_wq)
-		return;
+	/* Ensure MW is not on any rl_registered list */
+	if (!list_empty(&r->mw_list))
+		list_del(&r->mw_list);
 
-	wq = frwr_recovery_wq;
-	frwr_recovery_wq = NULL;
-	destroy_workqueue(wq);
+	rc = ib_dereg_mr(r->frmr.fr_mr);
+	if (rc)
+		pr_err("rpcrdma: final ib_dereg_mr for %p returned %i\n",
+		       r, rc);
+	kfree(r->mw_sg);
+	kfree(r);
 }
 
 static int
@@ -124,93 +166,37 @@ __frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
 	return 0;
 }
 
-static void
-__frwr_reset_and_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
-{
-	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-	struct rpcrdma_frmr *f = &mw->frmr;
-	int rc;
-
-	rc = __frwr_reset_mr(ia, mw);
-	ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents, f->fr_dir);
-	if (rc)
-		return;
-
-	rpcrdma_put_mw(r_xprt, mw);
-}
-
-/* Deferred reset of a single FRMR. Generate a fresh rkey by
- * replacing the MR.
+/* Reset of a single FRMR. Generate a fresh rkey by replacing the MR.
  *
  * There's no recovery if this fails. The FRMR is abandoned, but
  * remains in rb_all. It will be cleaned up when the transport is
  * destroyed.
  */
 static void
-__frwr_recovery_worker(struct work_struct *work)
-{
-	struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
-					    mw_work);
-
-	__frwr_reset_and_unmap(r->mw_xprt, r);
-	return;
-}
-
-/* A broken MR was discovered in a context that can't sleep.
- * Defer recovery to the recovery worker.
- */
-static void
-__frwr_queue_recovery(struct rpcrdma_mw *r)
-{
-	INIT_WORK(&r->mw_work, __frwr_recovery_worker);
-	queue_work(frwr_recovery_wq, &r->mw_work);
-}
-
-static int
-__frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
-	    unsigned int depth)
+frwr_op_recover_mr(struct rpcrdma_mw *mw)
 {
-	struct rpcrdma_frmr *f = &r->frmr;
+	struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 	int rc;
 
-	f->fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
-	if (IS_ERR(f->fr_mr))
-		goto out_mr_err;
-
-	f->fr_sg = kcalloc(depth, sizeof(*f->fr_sg), GFP_KERNEL);
-	if (!f->fr_sg)
-		goto out_list_err;
-
-	sg_init_table(f->fr_sg, depth);
-
-	init_completion(&f->fr_linv_done);
-
-	return 0;
+	rc = __frwr_reset_mr(ia, mw);
+	ib_dma_unmap_sg(ia->ri_device, mw->mw_sg, mw->mw_nents, mw->mw_dir);
+	if (rc)
+		goto out_release;
 
-out_mr_err:
-	rc = PTR_ERR(f->fr_mr);
-	dprintk("RPC:       %s: ib_alloc_mr status %i\n",
-		__func__, rc);
-	return rc;
+	rpcrdma_put_mw(r_xprt, mw);
+	r_xprt->rx_stats.mrs_recovered++;
+	return;
 
-out_list_err:
-	rc = -ENOMEM;
-	dprintk("RPC:       %s: sg allocation failure\n",
-		__func__);
-	ib_dereg_mr(f->fr_mr);
-	return rc;
-}
+out_release:
+	pr_err("rpcrdma: FRMR reset failed %d, %p release\n", rc, mw);
+	r_xprt->rx_stats.mrs_orphaned++;
 
-static void
-__frwr_release(struct rpcrdma_mw *r)
-{
-	int rc;
+	spin_lock(&r_xprt->rx_buf.rb_mwlock);
+	list_del(&mw->mw_all);
+	spin_unlock(&r_xprt->rx_buf.rb_mwlock);
 
-	rc = ib_dereg_mr(r->frmr.fr_mr);
-	if (rc)
-		dprintk("RPC:       %s: ib_dereg_mr status %i\n",
-			__func__, rc);
-	kfree(r->frmr.fr_sg);
+	frwr_op_release_mr(mw);
 }
 
 static int
@@ -346,57 +332,14 @@ frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
 	complete_all(&frmr->fr_linv_done);
 }
 
-static int
-frwr_op_init(struct rpcrdma_xprt *r_xprt)
-{
-	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
-	struct ib_device *device = r_xprt->rx_ia.ri_device;
-	unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
-	struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
-	int i;
-
-	spin_lock_init(&buf->rb_mwlock);
-	INIT_LIST_HEAD(&buf->rb_mws);
-	INIT_LIST_HEAD(&buf->rb_all);
-
-	i = max_t(int, RPCRDMA_MAX_DATA_SEGS / depth, 1);
-	i += 2;				/* head + tail */
-	i *= buf->rb_max_requests;	/* one set for each RPC slot */
-	dprintk("RPC:       %s: initalizing %d FRMRs\n", __func__, i);
-
-	while (i--) {
-		struct rpcrdma_mw *r;
-		int rc;
-
-		r = kzalloc(sizeof(*r), GFP_KERNEL);
-		if (!r)
-			return -ENOMEM;
-
-		rc = __frwr_init(r, pd, device, depth);
-		if (rc) {
-			kfree(r);
-			return rc;
-		}
-
-		r->mw_xprt = r_xprt;
-		list_add(&r->mw_list, &buf->rb_mws);
-		list_add(&r->mw_all, &buf->rb_all);
-	}
-
-	return 0;
-}
-
-/* Post a FAST_REG Work Request to register a memory region
+/* Post a REG_MR Work Request to register a memory region
  * for remote access via RDMA READ or RDMA WRITE.
  */
 static int
 frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
-	    int nsegs, bool writing)
+	    int nsegs, bool writing, struct rpcrdma_mw **out)
 {
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-	struct ib_device *device = ia->ri_device;
-	enum dma_data_direction direction = rpcrdma_data_dir(writing);
-	struct rpcrdma_mr_seg *seg1 = seg;
 	struct rpcrdma_mw *mw;
 	struct rpcrdma_frmr *frmr;
 	struct ib_mr *mr;
@@ -405,14 +348,13 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 	int rc, i, n, dma_nents;
 	u8 key;
 
-	mw = seg1->rl_mw;
-	seg1->rl_mw = NULL;
+	mw = NULL;
 	do {
 		if (mw)
-			__frwr_queue_recovery(mw);
+			rpcrdma_defer_mr_recovery(mw);
 		mw = rpcrdma_get_mw(r_xprt);
 		if (!mw)
-			return -ENOMEM;
+			return -ENOBUFS;
 	} while (mw->frmr.fr_state != FRMR_IS_INVALID);
 	frmr = &mw->frmr;
 	frmr->fr_state = FRMR_IS_VALID;
@@ -421,15 +363,14 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 
 	if (nsegs > ia->ri_max_frmr_depth)
 		nsegs = ia->ri_max_frmr_depth;
-
 	for (i = 0; i < nsegs;) {
 		if (seg->mr_page)
-			sg_set_page(&frmr->fr_sg[i],
+			sg_set_page(&mw->mw_sg[i],
 				    seg->mr_page,
 				    seg->mr_len,
 				    offset_in_page(seg->mr_offset));
 		else
-			sg_set_buf(&frmr->fr_sg[i], seg->mr_offset,
+			sg_set_buf(&mw->mw_sg[i], seg->mr_offset,
 				   seg->mr_len);
 
 		++seg;
@@ -440,26 +381,22 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
 			break;
 	}
-	frmr->fr_nents = i;
-	frmr->fr_dir = direction;
-
-	dma_nents = ib_dma_map_sg(device, frmr->fr_sg, frmr->fr_nents, direction);
-	if (!dma_nents) {
-		pr_err("RPC:       %s: failed to dma map sg %p sg_nents %u\n",
-		       __func__, frmr->fr_sg, frmr->fr_nents);
-		return -ENOMEM;
-	}
+	mw->mw_nents = i;
+	mw->mw_dir = rpcrdma_data_dir(writing);
+	if (i == 0)
+		goto out_dmamap_err;
 
-	n = ib_map_mr_sg(mr, frmr->fr_sg, frmr->fr_nents, NULL, PAGE_SIZE);
-	if (unlikely(n != frmr->fr_nents)) {
-		pr_err("RPC:       %s: failed to map mr %p (%u/%u)\n",
-		       __func__, frmr->fr_mr, n, frmr->fr_nents);
-		rc = n < 0 ? n : -EINVAL;
-		goto out_senderr;
-	}
+	dma_nents = ib_dma_map_sg(ia->ri_device,
+				  mw->mw_sg, mw->mw_nents, mw->mw_dir);
+	if (!dma_nents)
+		goto out_dmamap_err;
+
+	n = ib_map_mr_sg(mr, mw->mw_sg, mw->mw_nents, NULL, PAGE_SIZE);
+	if (unlikely(n != mw->mw_nents))
+		goto out_mapmr_err;
 
 	dprintk("RPC:       %s: Using frmr %p to map %u segments (%u bytes)\n",
-		__func__, mw, frmr->fr_nents, mr->length);
+		__func__, mw, mw->mw_nents, mr->length);
 
 	key = (u8)(mr->rkey & 0x000000FF);
 	ib_update_fast_reg_key(mr, ++key);
@@ -481,24 +418,34 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 	if (rc)
 		goto out_senderr;
 
-	seg1->rl_mw = mw;
-	seg1->mr_rkey = mr->rkey;
-	seg1->mr_base = mr->iova;
-	seg1->mr_nsegs = frmr->fr_nents;
-	seg1->mr_len = mr->length;
+	mw->mw_handle = mr->rkey;
+	mw->mw_length = mr->length;
+	mw->mw_offset = mr->iova;
+
+	*out = mw;
+	return mw->mw_nents;
 
-	return frmr->fr_nents;
+out_dmamap_err:
+	pr_err("rpcrdma: failed to dma map sg %p sg_nents %u\n",
+	       mw->mw_sg, mw->mw_nents);
+	rpcrdma_defer_mr_recovery(mw);
+	return -EIO;
+
+out_mapmr_err:
+	pr_err("rpcrdma: failed to map mr %p (%u/%u)\n",
+	       frmr->fr_mr, n, mw->mw_nents);
+	rpcrdma_defer_mr_recovery(mw);
+	return -EIO;
 
 out_senderr:
-	dprintk("RPC:       %s: ib_post_send status %i\n", __func__, rc);
-	__frwr_queue_recovery(mw);
-	return rc;
+	pr_err("rpcrdma: FRMR registration ib_post_send returned %i\n", rc);
+	rpcrdma_defer_mr_recovery(mw);
+	return -ENOTCONN;
 }
 
 static struct ib_send_wr *
-__frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg)
+__frwr_prepare_linv_wr(struct rpcrdma_mw *mw)
 {
-	struct rpcrdma_mw *mw = seg->rl_mw;
 	struct rpcrdma_frmr *f = &mw->frmr;
 	struct ib_send_wr *invalidate_wr;
 
@@ -518,16 +465,16 @@ __frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg)
  *
  * Sleeps until it is safe for the host CPU to access the
  * previously mapped memory regions.
+ *
+ * Caller ensures that req->rl_registered is not empty.
  */
 static void
 frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 {
 	struct ib_send_wr *invalidate_wrs, *pos, *prev, *bad_wr;
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-	struct rpcrdma_mr_seg *seg;
-	unsigned int i, nchunks;
+	struct rpcrdma_mw *mw, *tmp;
 	struct rpcrdma_frmr *f;
-	struct rpcrdma_mw *mw;
 	int rc;
 
 	dprintk("RPC:       %s: req %p\n", __func__, req);
@@ -537,22 +484,18 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 	 * Chain the LOCAL_INV Work Requests and post them with
 	 * a single ib_post_send() call.
 	 */
+	f = NULL;
 	invalidate_wrs = pos = prev = NULL;
-	seg = NULL;
-	for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
-		seg = &req->rl_segments[i];
-
-		pos = __frwr_prepare_linv_wr(seg);
+	list_for_each_entry(mw, &req->rl_registered, mw_list) {
+		pos = __frwr_prepare_linv_wr(mw);
 
 		if (!invalidate_wrs)
 			invalidate_wrs = pos;
 		else
 			prev->next = pos;
 		prev = pos;
-
-		i += seg->mr_nsegs;
+		f = &mw->frmr;
 	}
-	f = &seg->rl_mw->frmr;
 
 	/* Strong send queue ordering guarantees that when the
 	 * last WR in the chain completes, all WRs in the chain
@@ -577,39 +520,27 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 	 * them to the free MW list.
 	 */
 unmap:
-	for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
-		seg = &req->rl_segments[i];
-		mw = seg->rl_mw;
-		seg->rl_mw = NULL;
-
-		ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents,
-				f->fr_dir);
+	list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) {
+		list_del_init(&mw->mw_list);
+		ib_dma_unmap_sg(ia->ri_device,
+				mw->mw_sg, mw->mw_nents, mw->mw_dir);
 		rpcrdma_put_mw(r_xprt, mw);
-
-		i += seg->mr_nsegs;
-		seg->mr_nsegs = 0;
 	}
-
-	req->rl_nchunks = 0;
 	return;
 
 reset_mrs:
-	pr_warn("%s: ib_post_send failed %i\n", __func__, rc);
+	pr_err("rpcrdma: FRMR invalidate ib_post_send returned %i\n", rc);
+	rdma_disconnect(ia->ri_id);
 
 	/* Find and reset the MRs in the LOCAL_INV WRs that did not
 	 * get posted. This is synchronous, and slow.
 	 */
-	for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
-		seg = &req->rl_segments[i];
-		mw = seg->rl_mw;
+	list_for_each_entry(mw, &req->rl_registered, mw_list) {
 		f = &mw->frmr;
-
 		if (mw->frmr.fr_mr->rkey == bad_wr->ex.invalidate_rkey) {
 			__frwr_reset_mr(ia, mw);
 			bad_wr = bad_wr->next;
 		}
-
-		i += seg->mr_nsegs;
 	}
 	goto unmap;
 }
@@ -621,38 +552,17 @@ static void
 frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
 		   bool sync)
 {
-	struct rpcrdma_mr_seg *seg;
 	struct rpcrdma_mw *mw;
-	unsigned int i;
 
-	for (i = 0; req->rl_nchunks; req->rl_nchunks--) {
-		seg = &req->rl_segments[i];
-		mw = seg->rl_mw;
+	while (!list_empty(&req->rl_registered)) {
+		mw = list_first_entry(&req->rl_registered,
+				      struct rpcrdma_mw, mw_list);
+		list_del_init(&mw->mw_list);
 
 		if (sync)
-			__frwr_reset_and_unmap(r_xprt, mw);
+			frwr_op_recover_mr(mw);
 		else
-			__frwr_queue_recovery(mw);
-
-		i += seg->mr_nsegs;
-		seg->mr_nsegs = 0;
-		seg->rl_mw = NULL;
-	}
-}
-
-static void
-frwr_op_destroy(struct rpcrdma_buffer *buf)
-{
-	struct rpcrdma_mw *r;
-
-	/* Ensure stale MWs for "buf" are no longer in flight */
-	flush_workqueue(frwr_recovery_wq);
-
-	while (!list_empty(&buf->rb_all)) {
-		r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
-		list_del(&r->mw_all);
-		__frwr_release(r);
-		kfree(r);
+			rpcrdma_defer_mr_recovery(mw);
 	}
 }
 
@@ -660,9 +570,10 @@ const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
 	.ro_map				= frwr_op_map,
 	.ro_unmap_sync			= frwr_op_unmap_sync,
 	.ro_unmap_safe			= frwr_op_unmap_safe,
+	.ro_recover_mr			= frwr_op_recover_mr,
 	.ro_open			= frwr_op_open,
 	.ro_maxpages			= frwr_op_maxpages,
-	.ro_init			= frwr_op_init,
-	.ro_destroy			= frwr_op_destroy,
+	.ro_init_mr			= frwr_op_init_mr,
+	.ro_release_mr			= frwr_op_release_mr,
 	.ro_displayname			= "frwr",
 };
diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c
deleted file mode 100644
index 3750596cc432038ca524604e43479eb0af2012c5..0000000000000000000000000000000000000000
--- a/net/sunrpc/xprtrdma/physical_ops.c
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Copyright (c) 2015 Oracle.  All rights reserved.
- * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
- */
-
-/* No-op chunk preparation. All client memory is pre-registered.
- * Sometimes referred to as ALLPHYSICAL mode.
- *
- * Physical registration is simple because all client memory is
- * pre-registered and never deregistered. This mode is good for
- * adapter bring up, but is considered not safe: the server is
- * trusted not to abuse its access to client memory not involved
- * in RDMA I/O.
- */
-
-#include "xprt_rdma.h"
-
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
-# define RPCDBG_FACILITY	RPCDBG_TRANS
-#endif
-
-static int
-physical_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
-		 struct rpcrdma_create_data_internal *cdata)
-{
-	struct ib_mr *mr;
-
-	/* Obtain an rkey to use for RPC data payloads.
-	 */
-	mr = ib_get_dma_mr(ia->ri_pd,
-			   IB_ACCESS_LOCAL_WRITE |
-			   IB_ACCESS_REMOTE_WRITE |
-			   IB_ACCESS_REMOTE_READ);
-	if (IS_ERR(mr)) {
-		pr_err("%s: ib_get_dma_mr for failed with %lX\n",
-		       __func__, PTR_ERR(mr));
-		return -ENOMEM;
-	}
-	ia->ri_dma_mr = mr;
-
-	rpcrdma_set_max_header_sizes(ia, cdata, min_t(unsigned int,
-						      RPCRDMA_MAX_DATA_SEGS,
-						      RPCRDMA_MAX_HDR_SEGS));
-	return 0;
-}
-
-/* PHYSICAL memory registration conveys one page per chunk segment.
- */
-static size_t
-physical_op_maxpages(struct rpcrdma_xprt *r_xprt)
-{
-	return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
-		     RPCRDMA_MAX_HDR_SEGS);
-}
-
-static int
-physical_op_init(struct rpcrdma_xprt *r_xprt)
-{
-	return 0;
-}
-
-/* The client's physical memory is already exposed for
- * remote access via RDMA READ or RDMA WRITE.
- */
-static int
-physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
-		int nsegs, bool writing)
-{
-	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-
-	rpcrdma_map_one(ia->ri_device, seg, rpcrdma_data_dir(writing));
-	seg->mr_rkey = ia->ri_dma_mr->rkey;
-	seg->mr_base = seg->mr_dma;
-	return 1;
-}
-
-/* DMA unmap all memory regions that were mapped for "req".
- */
-static void
-physical_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
-{
-	struct ib_device *device = r_xprt->rx_ia.ri_device;
-	unsigned int i;
-
-	for (i = 0; req->rl_nchunks; --req->rl_nchunks)
-		rpcrdma_unmap_one(device, &req->rl_segments[i++]);
-}
-
-/* Use a slow, safe mechanism to invalidate all memory regions
- * that were registered for "req".
- *
- * For physical memory registration, there is no good way to
- * fence a single MR that has been advertised to the server. The
- * client has already handed the server an R_key that cannot be
- * invalidated and is shared by all MRs on this connection.
- * Tearing down the PD might be the only safe choice, but it's
- * not clear that a freshly acquired DMA R_key would be different
- * than the one used by the PD that was just destroyed.
- * FIXME.
- */
-static void
-physical_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
-		       bool sync)
-{
-	physical_op_unmap_sync(r_xprt, req);
-}
-
-static void
-physical_op_destroy(struct rpcrdma_buffer *buf)
-{
-}
-
-const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops = {
-	.ro_map				= physical_op_map,
-	.ro_unmap_sync			= physical_op_unmap_sync,
-	.ro_unmap_safe			= physical_op_unmap_safe,
-	.ro_open			= physical_op_open,
-	.ro_maxpages			= physical_op_maxpages,
-	.ro_init			= physical_op_init,
-	.ro_destroy			= physical_op_destroy,
-	.ro_displayname			= "physical",
-};
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 35a81096e83d50bd501726ed1d9376a5e4bcf54d..a47f170b20ef88d1ebe1f9ca406374dee1b84102 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -196,8 +196,7 @@ rpcrdma_tail_pullup(struct xdr_buf *buf)
  * MR when they can.
  */
 static int
-rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
-		     int n, int nsegs)
+rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n)
 {
 	size_t page_offset;
 	u32 remaining;
@@ -206,7 +205,7 @@ rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
 	base = vec->iov_base;
 	page_offset = offset_in_page(base);
 	remaining = vec->iov_len;
-	while (remaining && n < nsegs) {
+	while (remaining && n < RPCRDMA_MAX_SEGS) {
 		seg[n].mr_page = NULL;
 		seg[n].mr_offset = base;
 		seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
@@ -230,34 +229,34 @@ rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
 
 static int
 rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
-	enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs)
+	enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg)
 {
-	int len, n = 0, p;
-	int page_base;
+	int len, n, p, page_base;
 	struct page **ppages;
 
+	n = 0;
 	if (pos == 0) {
-		n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n, nsegs);
-		if (n == nsegs)
-			return -EIO;
+		n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n);
+		if (n == RPCRDMA_MAX_SEGS)
+			goto out_overflow;
 	}
 
 	len = xdrbuf->page_len;
 	ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
 	page_base = xdrbuf->page_base & ~PAGE_MASK;
 	p = 0;
-	while (len && n < nsegs) {
+	while (len && n < RPCRDMA_MAX_SEGS) {
 		if (!ppages[p]) {
 			/* alloc the pagelist for receiving buffer */
 			ppages[p] = alloc_page(GFP_ATOMIC);
 			if (!ppages[p])
-				return -ENOMEM;
+				return -EAGAIN;
 		}
 		seg[n].mr_page = ppages[p];
 		seg[n].mr_offset = (void *)(unsigned long) page_base;
 		seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len);
 		if (seg[n].mr_len > PAGE_SIZE)
-			return -EIO;
+			goto out_overflow;
 		len -= seg[n].mr_len;
 		++n;
 		++p;
@@ -265,8 +264,8 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
 	}
 
 	/* Message overflows the seg array */
-	if (len && n == nsegs)
-		return -EIO;
+	if (len && n == RPCRDMA_MAX_SEGS)
+		goto out_overflow;
 
 	/* When encoding the read list, the tail is always sent inline */
 	if (type == rpcrdma_readch)
@@ -277,20 +276,24 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
 		 * xdr pad bytes, saving the server an RDMA operation. */
 		if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize)
 			return n;
-		n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n, nsegs);
-		if (n == nsegs)
-			return -EIO;
+		n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n);
+		if (n == RPCRDMA_MAX_SEGS)
+			goto out_overflow;
 	}
 
 	return n;
+
+out_overflow:
+	pr_err("rpcrdma: segment array overflow\n");
+	return -EIO;
 }
 
 static inline __be32 *
-xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr_seg *seg)
+xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mw *mw)
 {
-	*iptr++ = cpu_to_be32(seg->mr_rkey);
-	*iptr++ = cpu_to_be32(seg->mr_len);
-	return xdr_encode_hyper(iptr, seg->mr_base);
+	*iptr++ = cpu_to_be32(mw->mw_handle);
+	*iptr++ = cpu_to_be32(mw->mw_length);
+	return xdr_encode_hyper(iptr, mw->mw_offset);
 }
 
 /* XDR-encode the Read list. Supports encoding a list of read
@@ -310,7 +313,8 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
 			 struct rpcrdma_req *req, struct rpc_rqst *rqst,
 			 __be32 *iptr, enum rpcrdma_chunktype rtype)
 {
-	struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+	struct rpcrdma_mr_seg *seg;
+	struct rpcrdma_mw *mw;
 	unsigned int pos;
 	int n, nsegs;
 
@@ -322,15 +326,17 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
 	pos = rqst->rq_snd_buf.head[0].iov_len;
 	if (rtype == rpcrdma_areadch)
 		pos = 0;
-	nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg,
-				     RPCRDMA_MAX_SEGS - req->rl_nchunks);
+	seg = req->rl_segments;
+	nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg);
 	if (nsegs < 0)
 		return ERR_PTR(nsegs);
 
 	do {
-		n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, false);
-		if (n <= 0)
+		n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
+						 false, &mw);
+		if (n < 0)
 			return ERR_PTR(n);
+		list_add(&mw->mw_list, &req->rl_registered);
 
 		*iptr++ = xdr_one;	/* item present */
 
@@ -338,20 +344,17 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
 		 * have the same "position".
 		 */
 		*iptr++ = cpu_to_be32(pos);
-		iptr = xdr_encode_rdma_segment(iptr, seg);
+		iptr = xdr_encode_rdma_segment(iptr, mw);
 
-		dprintk("RPC: %5u %s: read segment pos %u "
-			"%d@0x%016llx:0x%08x (%s)\n",
+		dprintk("RPC: %5u %s: pos %u %u@0x%016llx:0x%08x (%s)\n",
 			rqst->rq_task->tk_pid, __func__, pos,
-			seg->mr_len, (unsigned long long)seg->mr_base,
-			seg->mr_rkey, n < nsegs ? "more" : "last");
+			mw->mw_length, (unsigned long long)mw->mw_offset,
+			mw->mw_handle, n < nsegs ? "more" : "last");
 
 		r_xprt->rx_stats.read_chunk_count++;
-		req->rl_nchunks++;
 		seg += n;
 		nsegs -= n;
 	} while (nsegs);
-	req->rl_nextseg = seg;
 
 	/* Finish Read list */
 	*iptr++ = xdr_zero;	/* Next item not present */
@@ -375,7 +378,8 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
 			  struct rpc_rqst *rqst, __be32 *iptr,
 			  enum rpcrdma_chunktype wtype)
 {
-	struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+	struct rpcrdma_mr_seg *seg;
+	struct rpcrdma_mw *mw;
 	int n, nsegs, nchunks;
 	__be32 *segcount;
 
@@ -384,10 +388,10 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
 		return iptr;
 	}
 
+	seg = req->rl_segments;
 	nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf,
 				     rqst->rq_rcv_buf.head[0].iov_len,
-				     wtype, seg,
-				     RPCRDMA_MAX_SEGS - req->rl_nchunks);
+				     wtype, seg);
 	if (nsegs < 0)
 		return ERR_PTR(nsegs);
 
@@ -396,26 +400,25 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
 
 	nchunks = 0;
 	do {
-		n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
-		if (n <= 0)
+		n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
+						 true, &mw);
+		if (n < 0)
 			return ERR_PTR(n);
+		list_add(&mw->mw_list, &req->rl_registered);
 
-		iptr = xdr_encode_rdma_segment(iptr, seg);
+		iptr = xdr_encode_rdma_segment(iptr, mw);
 
-		dprintk("RPC: %5u %s: write segment "
-			"%d@0x016%llx:0x%08x (%s)\n",
+		dprintk("RPC: %5u %s: %u@0x016%llx:0x%08x (%s)\n",
 			rqst->rq_task->tk_pid, __func__,
-			seg->mr_len, (unsigned long long)seg->mr_base,
-			seg->mr_rkey, n < nsegs ? "more" : "last");
+			mw->mw_length, (unsigned long long)mw->mw_offset,
+			mw->mw_handle, n < nsegs ? "more" : "last");
 
 		r_xprt->rx_stats.write_chunk_count++;
 		r_xprt->rx_stats.total_rdma_request += seg->mr_len;
-		req->rl_nchunks++;
 		nchunks++;
 		seg   += n;
 		nsegs -= n;
 	} while (nsegs);
-	req->rl_nextseg = seg;
 
 	/* Update count of segments in this Write chunk */
 	*segcount = cpu_to_be32(nchunks);
@@ -442,7 +445,8 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
 			   struct rpcrdma_req *req, struct rpc_rqst *rqst,
 			   __be32 *iptr, enum rpcrdma_chunktype wtype)
 {
-	struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+	struct rpcrdma_mr_seg *seg;
+	struct rpcrdma_mw *mw;
 	int n, nsegs, nchunks;
 	__be32 *segcount;
 
@@ -451,8 +455,8 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
 		return iptr;
 	}
 
-	nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg,
-				     RPCRDMA_MAX_SEGS - req->rl_nchunks);
+	seg = req->rl_segments;
+	nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg);
 	if (nsegs < 0)
 		return ERR_PTR(nsegs);
 
@@ -461,26 +465,25 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
 
 	nchunks = 0;
 	do {
-		n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
-		if (n <= 0)
+		n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
+						 true, &mw);
+		if (n < 0)
 			return ERR_PTR(n);
+		list_add(&mw->mw_list, &req->rl_registered);
 
-		iptr = xdr_encode_rdma_segment(iptr, seg);
+		iptr = xdr_encode_rdma_segment(iptr, mw);
 
-		dprintk("RPC: %5u %s: reply segment "
-			"%d@0x%016llx:0x%08x (%s)\n",
+		dprintk("RPC: %5u %s: %u@0x%016llx:0x%08x (%s)\n",
 			rqst->rq_task->tk_pid, __func__,
-			seg->mr_len, (unsigned long long)seg->mr_base,
-			seg->mr_rkey, n < nsegs ? "more" : "last");
+			mw->mw_length, (unsigned long long)mw->mw_offset,
+			mw->mw_handle, n < nsegs ? "more" : "last");
 
 		r_xprt->rx_stats.reply_chunk_count++;
 		r_xprt->rx_stats.total_rdma_request += seg->mr_len;
-		req->rl_nchunks++;
 		nchunks++;
 		seg   += n;
 		nsegs -= n;
 	} while (nsegs);
-	req->rl_nextseg = seg;
 
 	/* Update count of segments in the Reply chunk */
 	*segcount = cpu_to_be32(nchunks);
@@ -567,6 +570,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	enum rpcrdma_chunktype rtype, wtype;
 	struct rpcrdma_msg *headerp;
+	bool ddp_allowed;
 	ssize_t hdrlen;
 	size_t rpclen;
 	__be32 *iptr;
@@ -583,6 +587,13 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
 	headerp->rm_type = rdma_msg;
 
+	/* When the ULP employs a GSS flavor that guarantees integrity
+	 * or privacy, direct data placement of individual data items
+	 * is not allowed.
+	 */
+	ddp_allowed = !(rqst->rq_cred->cr_auth->au_flags &
+						RPCAUTH_AUTH_DATATOUCH);
+
 	/*
 	 * Chunks needed for results?
 	 *
@@ -594,7 +605,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	 */
 	if (rpcrdma_results_inline(r_xprt, rqst))
 		wtype = rpcrdma_noch;
-	else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
+	else if (ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ)
 		wtype = rpcrdma_writech;
 	else
 		wtype = rpcrdma_replych;
@@ -617,7 +628,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 		rtype = rpcrdma_noch;
 		rpcrdma_inline_pullup(rqst);
 		rpclen = rqst->rq_svec[0].iov_len;
-	} else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
+	} else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
 		rtype = rpcrdma_readch;
 		rpclen = rqst->rq_svec[0].iov_len;
 		rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
@@ -650,8 +661,6 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	 * send a Call message with a Position Zero Read chunk and a
 	 * regular Read chunk at the same time.
 	 */
-	req->rl_nchunks = 0;
-	req->rl_nextseg = req->rl_segments;
 	iptr = headerp->rm_body.rm_chunks;
 	iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype);
 	if (IS_ERR(iptr))
@@ -690,10 +699,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 out_overflow:
 	pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n",
 		hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]);
-	/* Terminate this RPC. Chunks registered above will be
-	 * released by xprt_release -> xprt_rmda_free .
-	 */
-	return -EIO;
+	iptr = ERR_PTR(-EIO);
 
 out_unmap:
 	r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
@@ -705,15 +711,13 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
  * RDMA'd by server. See map at rpcrdma_create_chunks()! :-)
  */
 static int
-rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __be32 **iptrp)
+rpcrdma_count_chunks(struct rpcrdma_rep *rep, int wrchunk, __be32 **iptrp)
 {
 	unsigned int i, total_len;
 	struct rpcrdma_write_chunk *cur_wchunk;
 	char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf);
 
 	i = be32_to_cpu(**iptrp);
-	if (i > max)
-		return -1;
 	cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
 	total_len = 0;
 	while (i--) {
@@ -744,45 +748,66 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b
 	return total_len;
 }
 
-/*
- * Scatter inline received data back into provided iov's.
+/**
+ * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs
+ * @rqst: controlling RPC request
+ * @srcp: points to RPC message payload in receive buffer
+ * @copy_len: remaining length of receive buffer content
+ * @pad: Write chunk pad bytes needed (zero for pure inline)
+ *
+ * The upper layer has set the maximum number of bytes it can
+ * receive in each component of rq_rcv_buf. These values are set in
+ * the head.iov_len, page_len, tail.iov_len, and buflen fields.
+ *
+ * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in
+ * many cases this function simply updates iov_base pointers in
+ * rq_rcv_buf to point directly to the received reply data, to
+ * avoid copying reply data.
+ *
+ * Returns the count of bytes which had to be memcopied.
  */
-static void
+static unsigned long
 rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
 {
-	int i, npages, curlen, olen;
+	unsigned long fixup_copy_count;
+	int i, npages, curlen;
 	char *destp;
 	struct page **ppages;
 	int page_base;
 
+	/* The head iovec is redirected to the RPC reply message
+	 * in the receive buffer, to avoid a memcopy.
+	 */
+	rqst->rq_rcv_buf.head[0].iov_base = srcp;
+	rqst->rq_private_buf.head[0].iov_base = srcp;
+
+	/* The contents of the receive buffer that follow
+	 * head.iov_len bytes are copied into the page list.
+	 */
 	curlen = rqst->rq_rcv_buf.head[0].iov_len;
-	if (curlen > copy_len) {	/* write chunk header fixup */
+	if (curlen > copy_len)
 		curlen = copy_len;
-		rqst->rq_rcv_buf.head[0].iov_len = curlen;
-	}
-
 	dprintk("RPC:       %s: srcp 0x%p len %d hdrlen %d\n",
 		__func__, srcp, copy_len, curlen);
-
-	/* Shift pointer for first receive segment only */
-	rqst->rq_rcv_buf.head[0].iov_base = srcp;
 	srcp += curlen;
 	copy_len -= curlen;
 
-	olen = copy_len;
-	i = 0;
-	rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen;
 	page_base = rqst->rq_rcv_buf.page_base;
 	ppages = rqst->rq_rcv_buf.pages + (page_base >> PAGE_SHIFT);
 	page_base &= ~PAGE_MASK;
-
+	fixup_copy_count = 0;
 	if (copy_len && rqst->rq_rcv_buf.page_len) {
-		npages = PAGE_ALIGN(page_base +
-			rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT;
-		for (; i < npages; i++) {
+		int pagelist_len;
+
+		pagelist_len = rqst->rq_rcv_buf.page_len;
+		if (pagelist_len > copy_len)
+			pagelist_len = copy_len;
+		npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT;
+		for (i = 0; i < npages; i++) {
 			curlen = PAGE_SIZE - page_base;
-			if (curlen > copy_len)
-				curlen = copy_len;
+			if (curlen > pagelist_len)
+				curlen = pagelist_len;
+
 			dprintk("RPC:       %s: page %d"
 				" srcp 0x%p len %d curlen %d\n",
 				__func__, i, srcp, copy_len, curlen);
@@ -792,39 +817,32 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
 			kunmap_atomic(destp);
 			srcp += curlen;
 			copy_len -= curlen;
-			if (copy_len == 0)
+			fixup_copy_count += curlen;
+			pagelist_len -= curlen;
+			if (!pagelist_len)
 				break;
 			page_base = 0;
 		}
-	}
 
-	if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) {
-		curlen = copy_len;
-		if (curlen > rqst->rq_rcv_buf.tail[0].iov_len)
-			curlen = rqst->rq_rcv_buf.tail[0].iov_len;
-		if (rqst->rq_rcv_buf.tail[0].iov_base != srcp)
-			memmove(rqst->rq_rcv_buf.tail[0].iov_base, srcp, curlen);
-		dprintk("RPC:       %s: tail srcp 0x%p len %d curlen %d\n",
-			__func__, srcp, copy_len, curlen);
-		rqst->rq_rcv_buf.tail[0].iov_len = curlen;
-		copy_len -= curlen; ++i;
-	} else
-		rqst->rq_rcv_buf.tail[0].iov_len = 0;
-
-	if (pad) {
-		/* implicit padding on terminal chunk */
-		unsigned char *p = rqst->rq_rcv_buf.tail[0].iov_base;
-		while (pad--)
-			p[rqst->rq_rcv_buf.tail[0].iov_len++] = 0;
+		/* Implicit padding for the last segment in a Write
+		 * chunk is inserted inline at the front of the tail
+		 * iovec. The upper layer ignores the content of
+		 * the pad. Simply ensure inline content in the tail
+		 * that follows the Write chunk is properly aligned.
+		 */
+		if (pad)
+			srcp -= pad;
 	}
 
-	if (copy_len)
-		dprintk("RPC:       %s: %d bytes in"
-			" %d extra segments (%d lost)\n",
-			__func__, olen, i, copy_len);
+	/* The tail iovec is redirected to the remaining data
+	 * in the receive buffer, to avoid a memcopy.
+	 */
+	if (copy_len || pad) {
+		rqst->rq_rcv_buf.tail[0].iov_base = srcp;
+		rqst->rq_private_buf.tail[0].iov_base = srcp;
+	}
 
-	/* TBD avoid a warning from call_decode() */
-	rqst->rq_private_buf = rqst->rq_rcv_buf;
+	return fixup_copy_count;
 }
 
 void
@@ -960,14 +978,13 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
 		    (headerp->rm_body.rm_chunks[1] == xdr_zero &&
 		     headerp->rm_body.rm_chunks[2] != xdr_zero) ||
 		    (headerp->rm_body.rm_chunks[1] != xdr_zero &&
-		     req->rl_nchunks == 0))
+		     list_empty(&req->rl_registered)))
 			goto badheader;
 		if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
 			/* count any expected write chunks in read reply */
 			/* start at write chunk array count */
 			iptr = &headerp->rm_body.rm_chunks[2];
-			rdmalen = rpcrdma_count_chunks(rep,
-						req->rl_nchunks, 1, &iptr);
+			rdmalen = rpcrdma_count_chunks(rep, 1, &iptr);
 			/* check for validity, and no reply chunk after */
 			if (rdmalen < 0 || *iptr++ != xdr_zero)
 				goto badheader;
@@ -988,8 +1005,10 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
 			rep->rr_len -= RPCRDMA_HDRLEN_MIN;
 			status = rep->rr_len;
 		}
-		/* Fix up the rpc results for upper layer */
-		rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, rdmalen);
+
+		r_xprt->rx_stats.fixup_copy_count +=
+			rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len,
+					     rdmalen);
 		break;
 
 	case rdma_nomsg:
@@ -997,11 +1016,11 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
 		if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
 		    headerp->rm_body.rm_chunks[1] != xdr_zero ||
 		    headerp->rm_body.rm_chunks[2] != xdr_one ||
-		    req->rl_nchunks == 0)
+		    list_empty(&req->rl_registered))
 			goto badheader;
 		iptr = (__be32 *)((unsigned char *)headerp +
 							RPCRDMA_HDRLEN_MIN);
-		rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr);
+		rdmalen = rpcrdma_count_chunks(rep, 0, &iptr);
 		if (rdmalen < 0)
 			goto badheader;
 		r_xprt->rx_stats.total_rdma_reply += rdmalen;
@@ -1014,14 +1033,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
 
 badheader:
 	default:
-		dprintk("%s: invalid rpcrdma reply header (type %d):"
-				" chunks[012] == %d %d %d"
-				" expected chunks <= %d\n",
-				__func__, be32_to_cpu(headerp->rm_type),
-				headerp->rm_body.rm_chunks[0],
-				headerp->rm_body.rm_chunks[1],
-				headerp->rm_body.rm_chunks[2],
-				req->rl_nchunks);
+		dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
+			rqst->rq_task->tk_pid, __func__,
+			be32_to_cpu(headerp->rm_type));
 		status = -EIO;
 		r_xprt->rx_stats.bad_reply_count++;
 		break;
@@ -1035,7 +1049,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
 	 * control: waking the next RPC waits until this RPC has
 	 * relinquished all its Send Queue entries.
 	 */
-	if (req->rl_nchunks)
+	if (!list_empty(&req->rl_registered))
 		r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);
 
 	spin_lock_bh(&xprt->transport_lock);
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 99d2e5b72726abd00f1ac5e5732d5fa02119f55a..81f0e879f019e43d35cc9c85ead0dfd17ebc8d30 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -558,7 +558,6 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)
 
 out_fail:
 	rpcrdma_buffer_put(req);
-	r_xprt->rx_stats.failed_marshal_count++;
 	return NULL;
 }
 
@@ -590,8 +589,19 @@ xprt_rdma_free(void *buffer)
 	rpcrdma_buffer_put(req);
 }
 
-/*
+/**
+ * xprt_rdma_send_request - marshal and send an RPC request
+ * @task: RPC task with an RPC message in rq_snd_buf
+ *
+ * Return values:
+ *        0:	The request has been sent
+ * ENOTCONN:	Caller needs to invoke connect logic then call again
+ *  ENOBUFS:	Call again later to send the request
+ *      EIO:	A permanent error occurred. The request was not sent,
+ *		and don't try it again
+ *
  * send_request invokes the meat of RPC RDMA. It must do the following:
+ *
  *  1.  Marshal the RPC request into an RPC RDMA request, which means
  *	putting a header in front of data, and creating IOVs for RDMA
  *	from those in the request.
@@ -600,7 +610,6 @@ xprt_rdma_free(void *buffer)
  *	the request (rpcrdma_ep_post).
  *  4.  No partial sends are possible in the RPC-RDMA protocol (as in UDP).
  */
-
 static int
 xprt_rdma_send_request(struct rpc_task *task)
 {
@@ -610,6 +619,9 @@ xprt_rdma_send_request(struct rpc_task *task)
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	int rc = 0;
 
+	/* On retransmit, remove any previously registered chunks */
+	r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
+
 	rc = rpcrdma_marshal_req(rqst);
 	if (rc < 0)
 		goto failed_marshal;
@@ -630,11 +642,12 @@ xprt_rdma_send_request(struct rpc_task *task)
 	return 0;
 
 failed_marshal:
-	r_xprt->rx_stats.failed_marshal_count++;
 	dprintk("RPC:       %s: rpcrdma_marshal_req failed, status %i\n",
 		__func__, rc);
 	if (rc == -EIO)
-		return -EIO;
+		r_xprt->rx_stats.failed_marshal_count++;
+	if (rc != -ENOTCONN)
+		return rc;
 drop_connection:
 	xprt_disconnect_done(xprt);
 	return -ENOTCONN;	/* implies disconnect */
@@ -660,7 +673,7 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
 		   xprt->stat.bad_xids,
 		   xprt->stat.req_u,
 		   xprt->stat.bklog_u);
-	seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu\n",
+	seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu ",
 		   r_xprt->rx_stats.read_chunk_count,
 		   r_xprt->rx_stats.write_chunk_count,
 		   r_xprt->rx_stats.reply_chunk_count,
@@ -672,6 +685,10 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
 		   r_xprt->rx_stats.failed_marshal_count,
 		   r_xprt->rx_stats.bad_reply_count,
 		   r_xprt->rx_stats.nomsg_call_count);
+	seq_printf(seq, "%lu %lu %lu\n",
+		   r_xprt->rx_stats.mrs_recovered,
+		   r_xprt->rx_stats.mrs_orphaned,
+		   r_xprt->rx_stats.mrs_allocated);
 }
 
 static int
@@ -741,7 +758,6 @@ void xprt_rdma_cleanup(void)
 			__func__, rc);
 
 	rpcrdma_destroy_wq();
-	frwr_destroy_recovery_wq();
 
 	rc = xprt_unregister_transport(&xprt_rdma_bc);
 	if (rc)
@@ -753,20 +769,13 @@ int xprt_rdma_init(void)
 {
 	int rc;
 
-	rc = frwr_alloc_recovery_wq();
-	if (rc)
-		return rc;
-
 	rc = rpcrdma_alloc_wq();
-	if (rc) {
-		frwr_destroy_recovery_wq();
+	if (rc)
 		return rc;
-	}
 
 	rc = xprt_register_transport(&xprt_rdma);
 	if (rc) {
 		rpcrdma_destroy_wq();
-		frwr_destroy_recovery_wq();
 		return rc;
 	}
 
@@ -774,7 +783,6 @@ int xprt_rdma_init(void)
 	if (rc) {
 		xprt_unregister_transport(&xprt_rdma);
 		rpcrdma_destroy_wq();
-		frwr_destroy_recovery_wq();
 		return rc;
 	}
 
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index b044d98a1370207422d129689bb43b35766fc76f..a74d79d473c8366c907b2e2bb7bcaba63e9572d9 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -379,8 +379,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 	struct rpcrdma_ia *ia = &xprt->rx_ia;
 	int rc;
 
-	ia->ri_dma_mr = NULL;
-
 	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
 	if (IS_ERR(ia->ri_id)) {
 		rc = PTR_ERR(ia->ri_id);
@@ -391,47 +389,29 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 	ia->ri_pd = ib_alloc_pd(ia->ri_device);
 	if (IS_ERR(ia->ri_pd)) {
 		rc = PTR_ERR(ia->ri_pd);
-		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
-			__func__, rc);
+		pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc);
 		goto out2;
 	}
 
-	if (memreg == RPCRDMA_FRMR) {
-		if (!(ia->ri_device->attrs.device_cap_flags &
-				IB_DEVICE_MEM_MGT_EXTENSIONS) ||
-		    (ia->ri_device->attrs.max_fast_reg_page_list_len == 0)) {
-			dprintk("RPC:       %s: FRMR registration "
-				"not supported by HCA\n", __func__);
-			memreg = RPCRDMA_MTHCAFMR;
-		}
-	}
-	if (memreg == RPCRDMA_MTHCAFMR) {
-		if (!ia->ri_device->alloc_fmr) {
-			dprintk("RPC:       %s: MTHCAFMR registration "
-				"not supported by HCA\n", __func__);
-			rc = -EINVAL;
-			goto out3;
-		}
-	}
-
 	switch (memreg) {
 	case RPCRDMA_FRMR:
-		ia->ri_ops = &rpcrdma_frwr_memreg_ops;
-		break;
-	case RPCRDMA_ALLPHYSICAL:
-		ia->ri_ops = &rpcrdma_physical_memreg_ops;
-		break;
+		if (frwr_is_supported(ia)) {
+			ia->ri_ops = &rpcrdma_frwr_memreg_ops;
+			break;
+		}
+		/*FALLTHROUGH*/
 	case RPCRDMA_MTHCAFMR:
-		ia->ri_ops = &rpcrdma_fmr_memreg_ops;
-		break;
+		if (fmr_is_supported(ia)) {
+			ia->ri_ops = &rpcrdma_fmr_memreg_ops;
+			break;
+		}
+		/*FALLTHROUGH*/
 	default:
-		printk(KERN_ERR "RPC: Unsupported memory "
-				"registration mode: %d\n", memreg);
-		rc = -ENOMEM;
+		pr_err("rpcrdma: Unsupported memory registration mode: %d\n",
+		       memreg);
+		rc = -EINVAL;
 		goto out3;
 	}
-	dprintk("RPC:       %s: memory registration strategy is '%s'\n",
-		__func__, ia->ri_ops->ro_displayname);
 
 	return 0;
 
@@ -585,8 +565,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 out2:
 	ib_free_cq(sendcq);
 out1:
-	if (ia->ri_dma_mr)
-		ib_dereg_mr(ia->ri_dma_mr);
 	return rc;
 }
 
@@ -600,8 +578,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 void
 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 {
-	int rc;
-
 	dprintk("RPC:       %s: entering, connected is %d\n",
 		__func__, ep->rep_connected);
 
@@ -615,12 +591,6 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 
 	ib_free_cq(ep->rep_attr.recv_cq);
 	ib_free_cq(ep->rep_attr.send_cq);
-
-	if (ia->ri_dma_mr) {
-		rc = ib_dereg_mr(ia->ri_dma_mr);
-		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
-			__func__, rc);
-	}
 }
 
 /*
@@ -777,6 +747,90 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 	ib_drain_qp(ia->ri_id->qp);
 }
 
+static void
+rpcrdma_mr_recovery_worker(struct work_struct *work)
+{
+	struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
+						  rb_recovery_worker.work);
+	struct rpcrdma_mw *mw;
+
+	spin_lock(&buf->rb_recovery_lock);
+	while (!list_empty(&buf->rb_stale_mrs)) {
+		mw = list_first_entry(&buf->rb_stale_mrs,
+				      struct rpcrdma_mw, mw_list);
+		list_del_init(&mw->mw_list);
+		spin_unlock(&buf->rb_recovery_lock);
+
+		dprintk("RPC:       %s: recovering MR %p\n", __func__, mw);
+		mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw);
+
+		spin_lock(&buf->rb_recovery_lock);
+	};
+	spin_unlock(&buf->rb_recovery_lock);
+}
+
+void
+rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw)
+{
+	struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+
+	spin_lock(&buf->rb_recovery_lock);
+	list_add(&mw->mw_list, &buf->rb_stale_mrs);
+	spin_unlock(&buf->rb_recovery_lock);
+
+	schedule_delayed_work(&buf->rb_recovery_worker, 0);
+}
+
+static void
+rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt)
+{
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+	unsigned int count;
+	LIST_HEAD(free);
+	LIST_HEAD(all);
+
+	for (count = 0; count < 32; count++) {
+		struct rpcrdma_mw *mw;
+		int rc;
+
+		mw = kzalloc(sizeof(*mw), GFP_KERNEL);
+		if (!mw)
+			break;
+
+		rc = ia->ri_ops->ro_init_mr(ia, mw);
+		if (rc) {
+			kfree(mw);
+			break;
+		}
+
+		mw->mw_xprt = r_xprt;
+
+		list_add(&mw->mw_list, &free);
+		list_add(&mw->mw_all, &all);
+	}
+
+	spin_lock(&buf->rb_mwlock);
+	list_splice(&free, &buf->rb_mws);
+	list_splice(&all, &buf->rb_all);
+	r_xprt->rx_stats.mrs_allocated += count;
+	spin_unlock(&buf->rb_mwlock);
+
+	dprintk("RPC:       %s: created %u MRs\n", __func__, count);
+}
+
+static void
+rpcrdma_mr_refresh_worker(struct work_struct *work)
+{
+	struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
+						  rb_refresh_worker.work);
+	struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
+						   rx_buf);
+
+	rpcrdma_create_mrs(r_xprt);
+}
+
 struct rpcrdma_req *
 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
 {
@@ -793,6 +847,7 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
 	spin_unlock(&buffer->rb_reqslock);
 	req->rl_cqe.done = rpcrdma_wc_send;
 	req->rl_buffer = &r_xprt->rx_buf;
+	INIT_LIST_HEAD(&req->rl_registered);
 	return req;
 }
 
@@ -832,17 +887,23 @@ int
 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 {
 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
-	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 	int i, rc;
 
 	buf->rb_max_requests = r_xprt->rx_data.max_requests;
 	buf->rb_bc_srv_max_requests = 0;
-	spin_lock_init(&buf->rb_lock);
 	atomic_set(&buf->rb_credits, 1);
+	spin_lock_init(&buf->rb_mwlock);
+	spin_lock_init(&buf->rb_lock);
+	spin_lock_init(&buf->rb_recovery_lock);
+	INIT_LIST_HEAD(&buf->rb_mws);
+	INIT_LIST_HEAD(&buf->rb_all);
+	INIT_LIST_HEAD(&buf->rb_stale_mrs);
+	INIT_DELAYED_WORK(&buf->rb_refresh_worker,
+			  rpcrdma_mr_refresh_worker);
+	INIT_DELAYED_WORK(&buf->rb_recovery_worker,
+			  rpcrdma_mr_recovery_worker);
 
-	rc = ia->ri_ops->ro_init(r_xprt);
-	if (rc)
-		goto out;
+	rpcrdma_create_mrs(r_xprt);
 
 	INIT_LIST_HEAD(&buf->rb_send_bufs);
 	INIT_LIST_HEAD(&buf->rb_allreqs);
@@ -862,7 +923,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 	}
 
 	INIT_LIST_HEAD(&buf->rb_recv_bufs);
-	for (i = 0; i < buf->rb_max_requests + 2; i++) {
+	for (i = 0; i < buf->rb_max_requests; i++) {
 		struct rpcrdma_rep *rep;
 
 		rep = rpcrdma_create_rep(r_xprt);
@@ -918,11 +979,39 @@ rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
 	kfree(req);
 }
 
+static void
+rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf)
+{
+	struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
+						   rx_buf);
+	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
+	struct rpcrdma_mw *mw;
+	unsigned int count;
+
+	count = 0;
+	spin_lock(&buf->rb_mwlock);
+	while (!list_empty(&buf->rb_all)) {
+		mw = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
+		list_del(&mw->mw_all);
+
+		spin_unlock(&buf->rb_mwlock);
+		ia->ri_ops->ro_release_mr(mw);
+		count++;
+		spin_lock(&buf->rb_mwlock);
+	}
+	spin_unlock(&buf->rb_mwlock);
+	r_xprt->rx_stats.mrs_allocated = 0;
+
+	dprintk("RPC:       %s: released %u MRs\n", __func__, count);
+}
+
 void
 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 {
 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
 
+	cancel_delayed_work_sync(&buf->rb_recovery_worker);
+
 	while (!list_empty(&buf->rb_recv_bufs)) {
 		struct rpcrdma_rep *rep;
 
@@ -944,7 +1033,7 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 	}
 	spin_unlock(&buf->rb_reqslock);
 
-	ia->ri_ops->ro_destroy(buf);
+	rpcrdma_destroy_mrs(buf);
 }
 
 struct rpcrdma_mw *
@@ -962,8 +1051,17 @@ rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
 	spin_unlock(&buf->rb_mwlock);
 
 	if (!mw)
-		pr_err("RPC:       %s: no MWs available\n", __func__);
+		goto out_nomws;
 	return mw;
+
+out_nomws:
+	dprintk("RPC:       %s: no MWs available\n", __func__);
+	schedule_delayed_work(&buf->rb_refresh_worker, 0);
+
+	/* Allow the reply handler and refresh worker to run */
+	cond_resched();
+
+	return NULL;
 }
 
 void
@@ -978,8 +1076,6 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
 
 /*
  * Get a set of request/reply buffers.
- *
- * Reply buffer (if available) is attached to send buffer upon return.
  */
 struct rpcrdma_req *
 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
@@ -998,13 +1094,13 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
 
 out_reqbuf:
 	spin_unlock(&buffers->rb_lock);
-	pr_warn("RPC:       %s: out of request buffers\n", __func__);
+	pr_warn("rpcrdma: out of request buffers (%p)\n", buffers);
 	return NULL;
 out_repbuf:
+	list_add(&req->rl_free, &buffers->rb_send_bufs);
 	spin_unlock(&buffers->rb_lock);
-	pr_warn("RPC:       %s: out of reply buffers\n", __func__);
-	req->rl_reply = NULL;
-	return req;
+	pr_warn("rpcrdma: out of reply buffers (%p)\n", buffers);
+	return NULL;
 }
 
 /*
@@ -1060,14 +1156,6 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
  */
 
-void
-rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
-{
-	dprintk("RPC:       map_one: offset %p iova %llx len %zu\n",
-		seg->mr_offset,
-		(unsigned long long)seg->mr_dma, seg->mr_dmalen);
-}
-
 /**
  * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
  * @ia: controlling rpcrdma_ia
@@ -1150,7 +1238,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
 	if (rep) {
 		rc = rpcrdma_ep_post_recv(ia, ep, rep);
 		if (rc)
-			goto out;
+			return rc;
 		req->rl_reply = NULL;
 	}
 
@@ -1175,10 +1263,12 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
 
 	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
 	if (rc)
-		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
-			rc);
-out:
-	return rc;
+		goto out_postsend_err;
+	return 0;
+
+out_postsend_err:
+	pr_err("rpcrdma: RDMA Send ib_post_send returned %i\n", rc);
+	return -ENOTCONN;
 }
 
 /*
@@ -1203,11 +1293,13 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
 				   DMA_BIDIRECTIONAL);
 
 	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
-
 	if (rc)
-		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
-			rc);
-	return rc;
+		goto out_postrecv;
+	return 0;
+
+out_postrecv:
+	pr_err("rpcrdma: ib_post_recv returned %i\n", rc);
+	return -ENOTCONN;
 }
 
 /**
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 95cdc66225ee1f52542119b7a2e152b888cf46b0..670fad57153a109b5f8d14ac9cd3733e874c81d3 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -68,7 +68,6 @@ struct rpcrdma_ia {
 	struct ib_device	*ri_device;
 	struct rdma_cm_id 	*ri_id;
 	struct ib_pd		*ri_pd;
-	struct ib_mr		*ri_dma_mr;
 	struct completion	ri_done;
 	int			ri_async_rc;
 	unsigned int		ri_max_frmr_depth;
@@ -172,23 +171,14 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
  *   o recv buffer (posted to provider)
  *   o ib_sge (also donated to provider)
  *   o status of reply (length, success or not)
- *   o bookkeeping state to get run by tasklet (list, etc)
+ *   o bookkeeping state to get run by reply handler (list, etc)
  *
- * These are allocated during initialization, per-transport instance;
- * however, the tasklet execution list itself is global, as it should
- * always be pretty short.
+ * These are allocated during initialization, per-transport instance.
  *
  * N of these are associated with a transport instance, and stored in
  * struct rpcrdma_buffer. N is the max number of outstanding requests.
  */
 
-#define RPCRDMA_MAX_DATA_SEGS	((1 * 1024 * 1024) / PAGE_SIZE)
-
-/* data segments + head/tail for Call + head/tail for Reply */
-#define RPCRDMA_MAX_SEGS 	(RPCRDMA_MAX_DATA_SEGS + 4)
-
-struct rpcrdma_buffer;
-
 struct rpcrdma_rep {
 	struct ib_cqe		rr_cqe;
 	unsigned int		rr_len;
@@ -221,9 +211,6 @@ enum rpcrdma_frmr_state {
 };
 
 struct rpcrdma_frmr {
-	struct scatterlist		*fr_sg;
-	int				fr_nents;
-	enum dma_data_direction		fr_dir;
 	struct ib_mr			*fr_mr;
 	struct ib_cqe			fr_cqe;
 	enum rpcrdma_frmr_state		fr_state;
@@ -235,18 +222,23 @@ struct rpcrdma_frmr {
 };
 
 struct rpcrdma_fmr {
-	struct ib_fmr		*fmr;
-	u64			*physaddrs;
+	struct ib_fmr		*fm_mr;
+	u64			*fm_physaddrs;
 };
 
 struct rpcrdma_mw {
+	struct list_head	mw_list;
+	struct scatterlist	*mw_sg;
+	int			mw_nents;
+	enum dma_data_direction	mw_dir;
 	union {
 		struct rpcrdma_fmr	fmr;
 		struct rpcrdma_frmr	frmr;
 	};
-	struct work_struct	mw_work;
 	struct rpcrdma_xprt	*mw_xprt;
-	struct list_head	mw_list;
+	u32			mw_handle;
+	u32			mw_length;
+	u64			mw_offset;
 	struct list_head	mw_all;
 };
 
@@ -266,33 +258,30 @@ struct rpcrdma_mw {
  * of iovs for send operations. The reason is that the iovs passed to
  * ib_post_{send,recv} must not be modified until the work request
  * completes.
- *
- * NOTES:
- *   o RPCRDMA_MAX_SEGS is the max number of addressible chunk elements we
- *     marshal. The number needed varies depending on the iov lists that
- *     are passed to us, the memory registration mode we are in, and if
- *     physical addressing is used, the layout.
  */
 
+/* Maximum number of page-sized "segments" per chunk list to be
+ * registered or invalidated. Must handle a Reply chunk:
+ */
+enum {
+	RPCRDMA_MAX_IOV_SEGS	= 3,
+	RPCRDMA_MAX_DATA_SEGS	= ((1 * 1024 * 1024) / PAGE_SIZE) + 1,
+	RPCRDMA_MAX_SEGS	= RPCRDMA_MAX_DATA_SEGS +
+				  RPCRDMA_MAX_IOV_SEGS,
+};
+
 struct rpcrdma_mr_seg {		/* chunk descriptors */
-	struct rpcrdma_mw *rl_mw;	/* registered MR */
-	u64		mr_base;	/* registration result */
-	u32		mr_rkey;	/* registration result */
 	u32		mr_len;		/* length of chunk or segment */
-	int		mr_nsegs;	/* number of segments in chunk or 0 */
-	enum dma_data_direction	mr_dir;	/* segment mapping direction */
-	dma_addr_t	mr_dma;		/* segment mapping address */
-	size_t		mr_dmalen;	/* segment mapping length */
 	struct page	*mr_page;	/* owning page, if any */
 	char		*mr_offset;	/* kva if no page, else offset */
 };
 
 #define RPCRDMA_MAX_IOVS	(2)
 
+struct rpcrdma_buffer;
 struct rpcrdma_req {
 	struct list_head	rl_free;
 	unsigned int		rl_niovs;
-	unsigned int		rl_nchunks;
 	unsigned int		rl_connect_cookie;
 	struct rpc_task		*rl_task;
 	struct rpcrdma_buffer	*rl_buffer;
@@ -300,12 +289,13 @@ struct rpcrdma_req {
 	struct ib_sge		rl_send_iov[RPCRDMA_MAX_IOVS];
 	struct rpcrdma_regbuf	*rl_rdmabuf;
 	struct rpcrdma_regbuf	*rl_sendbuf;
-	struct rpcrdma_mr_seg	rl_segments[RPCRDMA_MAX_SEGS];
-	struct rpcrdma_mr_seg	*rl_nextseg;
 
 	struct ib_cqe		rl_cqe;
 	struct list_head	rl_all;
 	bool			rl_backchannel;
+
+	struct list_head	rl_registered;	/* registered segments */
+	struct rpcrdma_mr_seg	rl_segments[RPCRDMA_MAX_SEGS];
 };
 
 static inline struct rpcrdma_req *
@@ -341,6 +331,11 @@ struct rpcrdma_buffer {
 	struct list_head	rb_allreqs;
 
 	u32			rb_bc_max_requests;
+
+	spinlock_t		rb_recovery_lock; /* protect rb_stale_mrs */
+	struct list_head	rb_stale_mrs;
+	struct delayed_work	rb_recovery_worker;
+	struct delayed_work	rb_refresh_worker;
 };
 #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
 
@@ -387,6 +382,9 @@ struct rpcrdma_stats {
 	unsigned long		bad_reply_count;
 	unsigned long		nomsg_call_count;
 	unsigned long		bcall_count;
+	unsigned long		mrs_recovered;
+	unsigned long		mrs_orphaned;
+	unsigned long		mrs_allocated;
 };
 
 /*
@@ -395,23 +393,25 @@ struct rpcrdma_stats {
 struct rpcrdma_xprt;
 struct rpcrdma_memreg_ops {
 	int		(*ro_map)(struct rpcrdma_xprt *,
-				  struct rpcrdma_mr_seg *, int, bool);
+				  struct rpcrdma_mr_seg *, int, bool,
+				  struct rpcrdma_mw **);
 	void		(*ro_unmap_sync)(struct rpcrdma_xprt *,
 					 struct rpcrdma_req *);
 	void		(*ro_unmap_safe)(struct rpcrdma_xprt *,
 					 struct rpcrdma_req *, bool);
+	void		(*ro_recover_mr)(struct rpcrdma_mw *);
 	int		(*ro_open)(struct rpcrdma_ia *,
 				   struct rpcrdma_ep *,
 				   struct rpcrdma_create_data_internal *);
 	size_t		(*ro_maxpages)(struct rpcrdma_xprt *);
-	int		(*ro_init)(struct rpcrdma_xprt *);
-	void		(*ro_destroy)(struct rpcrdma_buffer *);
+	int		(*ro_init_mr)(struct rpcrdma_ia *,
+				      struct rpcrdma_mw *);
+	void		(*ro_release_mr)(struct rpcrdma_mw *);
 	const char	*ro_displayname;
 };
 
 extern const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops;
 extern const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops;
-extern const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops;
 
 /*
  * RPCRDMA transport -- encapsulates the structures above for
@@ -446,6 +446,8 @@ extern int xprt_rdma_pad_optimize;
  */
 int rpcrdma_ia_open(struct rpcrdma_xprt *, struct sockaddr *, int);
 void rpcrdma_ia_close(struct rpcrdma_ia *);
+bool frwr_is_supported(struct rpcrdma_ia *);
+bool fmr_is_supported(struct rpcrdma_ia *);
 
 /*
  * Endpoint calls - xprtrdma/verbs.c
@@ -477,6 +479,8 @@ void rpcrdma_buffer_put(struct rpcrdma_req *);
 void rpcrdma_recv_buffer_get(struct rpcrdma_req *);
 void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
 
+void rpcrdma_defer_mr_recovery(struct rpcrdma_mw *);
+
 struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
 					    size_t, gfp_t);
 void rpcrdma_free_regbuf(struct rpcrdma_ia *,
@@ -484,9 +488,6 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *,
 
 int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
 
-int frwr_alloc_recovery_wq(void);
-void frwr_destroy_recovery_wq(void);
-
 int rpcrdma_alloc_wq(void);
 void rpcrdma_destroy_wq(void);
 
@@ -494,45 +495,12 @@ void rpcrdma_destroy_wq(void);
  * Wrappers for chunk registration, shared by read/write chunk code.
  */
 
-void rpcrdma_mapping_error(struct rpcrdma_mr_seg *);
-
 static inline enum dma_data_direction
 rpcrdma_data_dir(bool writing)
 {
 	return writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
 }
 
-static inline void
-rpcrdma_map_one(struct ib_device *device, struct rpcrdma_mr_seg *seg,
-		enum dma_data_direction direction)
-{
-	seg->mr_dir = direction;
-	seg->mr_dmalen = seg->mr_len;
-
-	if (seg->mr_page)
-		seg->mr_dma = ib_dma_map_page(device,
-				seg->mr_page, offset_in_page(seg->mr_offset),
-				seg->mr_dmalen, seg->mr_dir);
-	else
-		seg->mr_dma = ib_dma_map_single(device,
-				seg->mr_offset,
-				seg->mr_dmalen, seg->mr_dir);
-
-	if (ib_dma_mapping_error(device, seg->mr_dma))
-		rpcrdma_mapping_error(seg);
-}
-
-static inline void
-rpcrdma_unmap_one(struct ib_device *device, struct rpcrdma_mr_seg *seg)
-{
-	if (seg->mr_page)
-		ib_dma_unmap_page(device,
-				  seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
-	else
-		ib_dma_unmap_single(device,
-				    seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
-}
-
 /*
  * RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c
  */