提交 62164f31 编写于 作者: O Olga Kornievskaia 提交者: Anna Schumaker

NFS add support for asynchronous COPY

Change xdr to always send COPY asynchronously.

Keep the list copies send in a list under a server structure.
Once copy is sent, it waits on a completion structure that will
be signalled by the callback thread that receives CB_OFFLOAD.

If CB_OFFLOAD returned an error and even if it returned partial
bytes, ignore them (as we can't commit without a verifier to
match) and return an error.
Signed-off-by: NOlga Kornievskaia <kolga@netapp.com>
Signed-off-by: NAnna Schumaker <Anna.Schumaker@Netapp.com>
上级 67aa7444
......@@ -662,9 +662,45 @@ __be32 nfs4_callback_notify_lock(void *argp, void *resp,
}
#endif /* CONFIG_NFS_V4_1 */
#ifdef CONFIG_NFS_V4_2
__be32 nfs4_callback_offload(void *args, void *dummy,
static void nfs4_copy_cb_args(struct nfs4_copy_state *cp_state,
struct cb_offloadargs *args)
{
cp_state->count = args->wr_count;
cp_state->error = args->error;
if (!args->error) {
cp_state->verf.committed = args->wr_writeverf.committed;
memcpy(&cp_state->verf.verifier.data[0],
&args->wr_writeverf.verifier.data[0],
NFS4_VERIFIER_SIZE);
}
}
__be32 nfs4_callback_offload(void *data, void *dummy,
struct cb_process_state *cps)
{
struct cb_offloadargs *args = data;
struct nfs_server *server;
struct nfs4_copy_state *copy;
rcu_read_lock();
list_for_each_entry_rcu(server, &cps->clp->cl_superblocks,
client_link) {
spin_lock(&server->nfs_client->cl_lock);
list_for_each_entry(copy, &server->ss_copies, copies) {
if (memcmp(args->coa_stateid.other,
copy->stateid.other,
sizeof(args->coa_stateid.other)))
continue;
nfs4_copy_cb_args(copy, args);
complete(&copy->completion);
spin_unlock(&server->nfs_client->cl_lock);
goto out;
}
spin_unlock(&server->nfs_client->cl_lock);
}
out:
rcu_read_unlock();
return 0;
}
#endif /* CONFIG_NFS_V4_2 */
......@@ -886,6 +886,7 @@ struct nfs_server *nfs_alloc_server(void)
INIT_LIST_HEAD(&server->delegations);
INIT_LIST_HEAD(&server->layouts);
INIT_LIST_HEAD(&server->state_owners_lru);
INIT_LIST_HEAD(&server->ss_copies);
atomic_set(&server->active, 0);
......
......@@ -130,6 +130,37 @@ int nfs42_proc_deallocate(struct file *filep, loff_t offset, loff_t len)
return err;
}
static int handle_async_copy(struct nfs42_copy_res *res,
struct nfs_server *server,
struct file *src,
struct file *dst,
nfs4_stateid *src_stateid)
{
struct nfs4_copy_state *copy;
int status = NFS4_OK;
copy = kzalloc(sizeof(struct nfs4_copy_state), GFP_NOFS);
if (!copy)
return -ENOMEM;
memcpy(&copy->stateid, &res->write_res.stateid, NFS4_STATEID_SIZE);
init_completion(&copy->completion);
spin_lock(&server->nfs_client->cl_lock);
list_add_tail(&copy->copies, &server->ss_copies);
spin_unlock(&server->nfs_client->cl_lock);
wait_for_completion_interruptible(&copy->completion);
spin_lock(&server->nfs_client->cl_lock);
list_del_init(&copy->copies);
spin_unlock(&server->nfs_client->cl_lock);
res->write_res.count = copy->count;
memcpy(&res->write_res.verifier, &copy->verf, sizeof(copy->verf));
status = -copy->error;
kfree(copy);
return status;
}
static ssize_t _nfs42_proc_copy(struct file *src,
struct nfs_lock_context *src_lock,
struct file *dst,
......@@ -168,9 +199,13 @@ static ssize_t _nfs42_proc_copy(struct file *src,
if (status)
return status;
res->commit_res.verf = kzalloc(sizeof(struct nfs_writeverf), GFP_NOFS);
if (!res->commit_res.verf)
return -ENOMEM;
res->commit_res.verf = NULL;
if (args->sync) {
res->commit_res.verf =
kzalloc(sizeof(struct nfs_writeverf), GFP_NOFS);
if (!res->commit_res.verf)
return -ENOMEM;
}
status = nfs4_call_sync(server->client, server, &msg,
&args->seq_args, &res->seq_res, 0);
if (status == -ENOTSUPP)
......@@ -178,18 +213,27 @@ static ssize_t _nfs42_proc_copy(struct file *src,
if (status)
goto out;
if (nfs_write_verifier_cmp(&res->write_res.verifier.verifier,
if (args->sync &&
nfs_write_verifier_cmp(&res->write_res.verifier.verifier,
&res->commit_res.verf->verifier)) {
status = -EAGAIN;
goto out;
}
if (!res->synchronous) {
status = handle_async_copy(res, server, src, dst,
&args->src_stateid);
if (status)
return status;
}
truncate_pagecache_range(dst_inode, pos_dst,
pos_dst + res->write_res.count);
status = res->write_res.count;
out:
kfree(res->commit_res.verf);
if (args->sync)
kfree(res->commit_res.verf);
return status;
}
......@@ -206,6 +250,7 @@ ssize_t nfs42_proc_copy(struct file *src, loff_t pos_src,
.dst_fh = NFS_FH(file_inode(dst)),
.dst_pos = pos_dst,
.count = count,
.sync = false,
};
struct nfs42_copy_res res;
struct nfs4_exception src_exception = {
......
......@@ -150,7 +150,7 @@ static void encode_copy(struct xdr_stream *xdr,
encode_uint64(xdr, args->count);
encode_uint32(xdr, 1); /* consecutive = true */
encode_uint32(xdr, 1); /* synchronous = true */
encode_uint32(xdr, args->sync);
encode_uint32(xdr, 0); /* src server list */
}
......@@ -273,7 +273,8 @@ static void nfs4_xdr_enc_copy(struct rpc_rqst *req,
encode_savefh(xdr, &hdr);
encode_putfh(xdr, args->dst_fh, &hdr);
encode_copy(xdr, args, &hdr);
encode_copy_commit(xdr, args, &hdr);
if (args->sync)
encode_copy_commit(xdr, args, &hdr);
encode_nops(&hdr);
}
......@@ -551,7 +552,8 @@ static int nfs4_xdr_dec_copy(struct rpc_rqst *rqstp,
status = decode_copy(xdr, res);
if (status)
goto out;
status = decode_commit(xdr, &res->commit_res);
if (res->commit_res.verf)
status = decode_commit(xdr, &res->commit_res);
out:
return status;
}
......
......@@ -185,6 +185,15 @@ struct nfs_inode {
struct inode vfs_inode;
};
struct nfs4_copy_state {
struct list_head copies;
nfs4_stateid stateid;
struct completion completion;
uint64_t count;
struct nfs_writeverf verf;
int error;
};
/*
* Access bit flags
*/
......
......@@ -208,6 +208,7 @@ struct nfs_server {
struct list_head state_owners_lru;
struct list_head layouts;
struct list_head delegations;
struct list_head ss_copies;
unsigned long mig_gen;
unsigned long mig_status;
......
......@@ -1388,6 +1388,7 @@ struct nfs42_copy_args {
u64 dst_pos;
u64 count;
bool sync;
};
struct nfs42_write_res {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册