提交 fcc95f06 编写于 作者: L Linus Torvalds

Merge tag 'ceph-for-5.7-rc1' of git://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "The main items are:

   - support for asynchronous create and unlink (Jeff Layton).

     Creates and unlinks are satisfied locally, without waiting for a
     reply from the MDS, provided the client has been granted
     appropriate caps (new in v15.y.z ("Octopus") release). This can be
     a big help for metadata heavy workloads such as tar and rsync.
     Opt-in with the new nowsync mount option.

   - multiple blk-mq queues for rbd (Hannes Reinecke and myself).

     When the driver was converted to blk-mq, we settled on a single
     blk-mq queue because of a global lock in libceph and some other
     technical debt. These have since been addressed, so allocate a
     queue per CPU to enhance parallelism.

   - don't hold onto caps that aren't actually needed (Zheng Yan).

     This has been our long-standing behavior, but it causes issues with
     some active/standby applications (synchronous I/O, stalls if the
     standby goes down, etc).

   - .snap directory timestamps consistent with ceph-fuse (Luis
     Henriques)"

* tag 'ceph-for-5.7-rc1' of git://github.com/ceph/ceph-client: (49 commits)
  ceph: fix snapshot directory timestamps
  ceph: wait for async creating inode before requesting new max size
  ceph: don't skip updating wanted caps when cap is stale
  ceph: request new max size only when there is auth cap
  ceph: cleanup return error of try_get_cap_refs()
  ceph: return ceph_mdsc_do_request() errors from __get_parent()
  ceph: check all mds' caps after page writeback
  ceph: update i_requested_max_size only when sending cap msg to auth mds
  ceph: simplify calling of ceph_get_fmode()
  ceph: remove delay check logic from ceph_check_caps()
  ceph: consider inode's last read/write when calculating wanted caps
  ceph: always renew caps if mds_wanted is insufficient
  ceph: update dentry lease for async create
  ceph: attempt to do async create when possible
  ceph: cache layout in parent dir on first sync create
  ceph: add new MDS req field to hold delegated inode number
  ceph: decode interval_sets for delegated inos
  ceph: make ceph_fill_inode non-static
  ceph: perform asynchronous unlink if we have sufficient caps
  ceph: don't take refs to want mask unless we have all bits
  ...
......@@ -107,17 +107,17 @@ Mount Options
address its connection to the monitor originates from.
wsize=X
Specify the maximum write size in bytes. Default: 16 MB.
Specify the maximum write size in bytes. Default: 64 MB.
rsize=X
Specify the maximum read size in bytes. Default: 16 MB.
Specify the maximum read size in bytes. Default: 64 MB.
rasize=X
Specify the maximum readahead size in bytes. Default: 8 MB.
mount_timeout=X
Specify the timeout value for mount (in seconds), in the case
of a non-responsive Ceph file system. The default is 30
of a non-responsive Ceph file system. The default is 60
seconds.
caps_max=X
......
......@@ -337,10 +337,7 @@ struct rbd_img_request {
u64 snap_id; /* for reads */
struct ceph_snap_context *snapc; /* for writes */
};
union {
struct request *rq; /* block request */
struct rbd_obj_request *obj_request; /* obj req initiator */
};
struct rbd_obj_request *obj_request; /* obj req initiator */
struct list_head lock_item;
struct list_head object_extents; /* obj_req.ex structs */
......@@ -349,7 +346,6 @@ struct rbd_img_request {
struct pending_result pending;
struct work_struct work;
int work_result;
struct kref kref;
};
#define for_each_obj_request(ireq, oreq) \
......@@ -1320,15 +1316,6 @@ static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
kref_put(&obj_request->kref, rbd_obj_request_destroy);
}
static void rbd_img_request_destroy(struct kref *kref);
static void rbd_img_request_put(struct rbd_img_request *img_request)
{
rbd_assert(img_request != NULL);
dout("%s: img %p (was %d)\n", __func__, img_request,
kref_read(&img_request->kref));
kref_put(&img_request->kref, rbd_img_request_destroy);
}
static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
struct rbd_obj_request *obj_request)
{
......@@ -1366,18 +1353,10 @@ static void rbd_osd_submit(struct ceph_osd_request *osd_req)
static void img_request_layered_set(struct rbd_img_request *img_request)
{
set_bit(IMG_REQ_LAYERED, &img_request->flags);
smp_mb();
}
static void img_request_layered_clear(struct rbd_img_request *img_request)
{
clear_bit(IMG_REQ_LAYERED, &img_request->flags);
smp_mb();
}
static bool img_request_layered_test(struct rbd_img_request *img_request)
{
smp_mb();
return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
}
......@@ -1619,10 +1598,8 @@ static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
if (!rbd_dev->parent_spec)
return false;
down_read(&rbd_dev->header_rwsem);
if (rbd_dev->parent_overlap)
counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
up_read(&rbd_dev->header_rwsem);
if (counter < 0)
rbd_warn(rbd_dev, "parent reference overflow");
......@@ -1630,63 +1607,54 @@ static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
return counter > 0;
}
/*
* Caller is responsible for filling in the list of object requests
* that comprises the image request, and the Linux request pointer
* (if there is one).
*/
static struct rbd_img_request *rbd_img_request_create(
struct rbd_device *rbd_dev,
enum obj_operation_type op_type,
struct ceph_snap_context *snapc)
static void rbd_img_request_init(struct rbd_img_request *img_request,
struct rbd_device *rbd_dev,
enum obj_operation_type op_type)
{
struct rbd_img_request *img_request;
img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
if (!img_request)
return NULL;
memset(img_request, 0, sizeof(*img_request));
img_request->rbd_dev = rbd_dev;
img_request->op_type = op_type;
if (!rbd_img_is_write(img_request))
img_request->snap_id = rbd_dev->spec->snap_id;
else
img_request->snapc = snapc;
if (rbd_dev_parent_get(rbd_dev))
img_request_layered_set(img_request);
INIT_LIST_HEAD(&img_request->lock_item);
INIT_LIST_HEAD(&img_request->object_extents);
mutex_init(&img_request->state_mutex);
kref_init(&img_request->kref);
}
static void rbd_img_capture_header(struct rbd_img_request *img_req)
{
struct rbd_device *rbd_dev = img_req->rbd_dev;
lockdep_assert_held(&rbd_dev->header_rwsem);
return img_request;
if (rbd_img_is_write(img_req))
img_req->snapc = ceph_get_snap_context(rbd_dev->header.snapc);
else
img_req->snap_id = rbd_dev->spec->snap_id;
if (rbd_dev_parent_get(rbd_dev))
img_request_layered_set(img_req);
}
static void rbd_img_request_destroy(struct kref *kref)
static void rbd_img_request_destroy(struct rbd_img_request *img_request)
{
struct rbd_img_request *img_request;
struct rbd_obj_request *obj_request;
struct rbd_obj_request *next_obj_request;
img_request = container_of(kref, struct rbd_img_request, kref);
dout("%s: img %p\n", __func__, img_request);
WARN_ON(!list_empty(&img_request->lock_item));
for_each_obj_request_safe(img_request, obj_request, next_obj_request)
rbd_img_obj_request_del(img_request, obj_request);
if (img_request_layered_test(img_request)) {
img_request_layered_clear(img_request);
if (img_request_layered_test(img_request))
rbd_dev_parent_put(img_request->rbd_dev);
}
if (rbd_img_is_write(img_request))
ceph_put_snap_context(img_request->snapc);
kmem_cache_free(rbd_img_request_cache, img_request);
if (test_bit(IMG_REQ_CHILD, &img_request->flags))
kmem_cache_free(rbd_img_request_cache, img_request);
}
#define BITS_PER_OBJ 2
......@@ -2849,17 +2817,22 @@ static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
{
struct rbd_img_request *img_req = obj_req->img_request;
struct rbd_device *parent = img_req->rbd_dev->parent;
struct rbd_img_request *child_img_req;
int ret;
child_img_req = rbd_img_request_create(img_req->rbd_dev->parent,
OBJ_OP_READ, NULL);
child_img_req = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
if (!child_img_req)
return -ENOMEM;
rbd_img_request_init(child_img_req, parent, OBJ_OP_READ);
__set_bit(IMG_REQ_CHILD, &child_img_req->flags);
child_img_req->obj_request = obj_req;
down_read(&parent->header_rwsem);
rbd_img_capture_header(child_img_req);
up_read(&parent->header_rwsem);
dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req,
obj_req);
......@@ -2888,7 +2861,7 @@ static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
obj_req->copyup_bvecs);
}
if (ret) {
rbd_img_request_put(child_img_req);
rbd_img_request_destroy(child_img_req);
return ret;
}
......@@ -3647,15 +3620,15 @@ static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
struct rbd_obj_request *obj_req = img_req->obj_request;
rbd_img_request_put(img_req);
rbd_img_request_destroy(img_req);
if (__rbd_obj_handle_request(obj_req, &result)) {
img_req = obj_req->img_request;
goto again;
}
} else {
struct request *rq = img_req->rq;
struct request *rq = blk_mq_rq_from_pdu(img_req);
rbd_img_request_put(img_req);
rbd_img_request_destroy(img_req);
blk_mq_end_request(rq, errno_to_blk_status(result));
}
}
......@@ -4707,84 +4680,36 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
static void rbd_queue_workfn(struct work_struct *work)
{
struct request *rq = blk_mq_rq_from_pdu(work);
struct rbd_device *rbd_dev = rq->q->queuedata;
struct rbd_img_request *img_request;
struct ceph_snap_context *snapc = NULL;
struct rbd_img_request *img_request =
container_of(work, struct rbd_img_request, work);
struct rbd_device *rbd_dev = img_request->rbd_dev;
enum obj_operation_type op_type = img_request->op_type;
struct request *rq = blk_mq_rq_from_pdu(img_request);
u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
u64 length = blk_rq_bytes(rq);
enum obj_operation_type op_type;
u64 mapping_size;
int result;
switch (req_op(rq)) {
case REQ_OP_DISCARD:
op_type = OBJ_OP_DISCARD;
break;
case REQ_OP_WRITE_ZEROES:
op_type = OBJ_OP_ZEROOUT;
break;
case REQ_OP_WRITE:
op_type = OBJ_OP_WRITE;
break;
case REQ_OP_READ:
op_type = OBJ_OP_READ;
break;
default:
dout("%s: non-fs request type %d\n", __func__, req_op(rq));
result = -EIO;
goto err;
}
/* Ignore/skip any zero-length requests */
if (!length) {
dout("%s: zero-length request\n", __func__);
result = 0;
goto err_rq;
}
if (op_type != OBJ_OP_READ) {
if (rbd_is_ro(rbd_dev)) {
rbd_warn(rbd_dev, "%s on read-only mapping",
obj_op_name(op_type));
result = -EIO;
goto err;
}
rbd_assert(!rbd_is_snap(rbd_dev));
}
if (offset && length > U64_MAX - offset + 1) {
rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
length);
result = -EINVAL;
goto err_rq; /* Shouldn't happen */
goto err_img_request;
}
blk_mq_start_request(rq);
down_read(&rbd_dev->header_rwsem);
mapping_size = rbd_dev->mapping.size;
if (op_type != OBJ_OP_READ) {
snapc = rbd_dev->header.snapc;
ceph_get_snap_context(snapc);
}
rbd_img_capture_header(img_request);
up_read(&rbd_dev->header_rwsem);
if (offset + length > mapping_size) {
rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
length, mapping_size);
result = -EIO;
goto err_rq;
}
img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
if (!img_request) {
result = -ENOMEM;
goto err_rq;
goto err_img_request;
}
img_request->rq = rq;
snapc = NULL; /* img_request consumes a ref */
dout("%s rbd_dev %p img_req %p %s %llu~%llu\n", __func__, rbd_dev,
img_request, obj_op_name(op_type), offset, length);
......@@ -4801,23 +4726,51 @@ static void rbd_queue_workfn(struct work_struct *work)
return;
err_img_request:
rbd_img_request_put(img_request);
err_rq:
rbd_img_request_destroy(img_request);
if (result)
rbd_warn(rbd_dev, "%s %llx at %llx result %d",
obj_op_name(op_type), length, offset, result);
ceph_put_snap_context(snapc);
err:
blk_mq_end_request(rq, errno_to_blk_status(result));
}
static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
const struct blk_mq_queue_data *bd)
{
struct request *rq = bd->rq;
struct work_struct *work = blk_mq_rq_to_pdu(rq);
struct rbd_device *rbd_dev = hctx->queue->queuedata;
struct rbd_img_request *img_req = blk_mq_rq_to_pdu(bd->rq);
enum obj_operation_type op_type;
queue_work(rbd_wq, work);
switch (req_op(bd->rq)) {
case REQ_OP_DISCARD:
op_type = OBJ_OP_DISCARD;
break;
case REQ_OP_WRITE_ZEROES:
op_type = OBJ_OP_ZEROOUT;
break;
case REQ_OP_WRITE:
op_type = OBJ_OP_WRITE;
break;
case REQ_OP_READ:
op_type = OBJ_OP_READ;
break;
default:
rbd_warn(rbd_dev, "unknown req_op %d", req_op(bd->rq));
return BLK_STS_IOERR;
}
rbd_img_request_init(img_req, rbd_dev, op_type);
if (rbd_img_is_write(img_req)) {
if (rbd_is_ro(rbd_dev)) {
rbd_warn(rbd_dev, "%s on read-only mapping",
obj_op_name(img_req->op_type));
return BLK_STS_IOERR;
}
rbd_assert(!rbd_is_snap(rbd_dev));
}
INIT_WORK(&img_req->work, rbd_queue_workfn);
queue_work(rbd_wq, &img_req->work);
return BLK_STS_OK;
}
......@@ -4984,18 +4937,8 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
return ret;
}
static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
unsigned int hctx_idx, unsigned int numa_node)
{
struct work_struct *work = blk_mq_rq_to_pdu(rq);
INIT_WORK(work, rbd_queue_workfn);
return 0;
}
static const struct blk_mq_ops rbd_mq_ops = {
.queue_rq = rbd_queue_rq,
.init_request = rbd_init_request,
};
static int rbd_init_disk(struct rbd_device *rbd_dev)
......@@ -5027,8 +4970,8 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
rbd_dev->tag_set.nr_hw_queues = 1;
rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
rbd_dev->tag_set.nr_hw_queues = num_present_cpus();
rbd_dev->tag_set.cmd_size = sizeof(struct rbd_img_request);
err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
if (err)
......
......@@ -159,8 +159,6 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
if (!PagePrivate(page))
return;
ClearPageChecked(page);
dout("%p invalidatepage %p idx %lu full dirty page\n",
inode, page, page->index);
......@@ -182,6 +180,47 @@ static int ceph_releasepage(struct page *page, gfp_t g)
return !PagePrivate(page);
}
/*
* Read some contiguous pages. If we cross a stripe boundary, shorten
* *plen. Return number of bytes read, or error.
*/
static int ceph_sync_readpages(struct ceph_fs_client *fsc,
struct ceph_vino vino,
struct ceph_file_layout *layout,
u64 off, u64 *plen,
u32 truncate_seq, u64 truncate_size,
struct page **pages, int num_pages,
int page_align)
{
struct ceph_osd_client *osdc = &fsc->client->osdc;
struct ceph_osd_request *req;
int rc = 0;
dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
vino.snap, off, *plen);
req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
NULL, truncate_seq, truncate_size,
false);
if (IS_ERR(req))
return PTR_ERR(req);
/* it may be a short read due to an object boundary */
osd_req_op_extent_osd_data_pages(req, 0,
pages, *plen, page_align, false, false);
dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n",
off, *plen, *plen, page_align);
rc = ceph_osdc_start_request(osdc, req, false);
if (!rc)
rc = ceph_osdc_wait_request(osdc, req);
ceph_osdc_put_request(req);
dout("readpages result %d\n", rc);
return rc;
}
/*
* read a single page, without unlocking it.
*/
......@@ -218,7 +257,7 @@ static int ceph_do_readpage(struct file *filp, struct page *page)
dout("readpage inode %p file %p page %p index %lu\n",
inode, filp, page, page->index);
err = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
err = ceph_sync_readpages(fsc, ceph_vino(inode),
&ci->i_layout, off, &len,
ci->i_truncate_seq, ci->i_truncate_size,
&page, 1, 0);
......@@ -570,6 +609,47 @@ static u64 get_writepages_data_length(struct inode *inode,
return end > start ? end - start : 0;
}
/*
* do a synchronous write on N pages
*/
static int ceph_sync_writepages(struct ceph_fs_client *fsc,
struct ceph_vino vino,
struct ceph_file_layout *layout,
struct ceph_snap_context *snapc,
u64 off, u64 len,
u32 truncate_seq, u64 truncate_size,
struct timespec64 *mtime,
struct page **pages, int num_pages)
{
struct ceph_osd_client *osdc = &fsc->client->osdc;
struct ceph_osd_request *req;
int rc = 0;
int page_align = off & ~PAGE_MASK;
req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
snapc, truncate_seq, truncate_size,
true);
if (IS_ERR(req))
return PTR_ERR(req);
/* it may be a short write due to an object boundary */
osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
false, false);
dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
req->r_mtime = *mtime;
rc = ceph_osdc_start_request(osdc, req, true);
if (!rc)
rc = ceph_osdc_wait_request(osdc, req);
ceph_osdc_put_request(req);
if (rc == 0)
rc = len;
dout("writepages result %d\n", rc);
return rc;
}
/*
* Write a single page, but leave the page locked.
*
......@@ -628,7 +708,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
set_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
set_page_writeback(page);
err = ceph_osdc_writepages(&fsc->client->osdc, ceph_vino(inode),
err = ceph_sync_writepages(fsc, ceph_vino(inode),
&ci->i_layout, snapc, page_off, len,
ceph_wbc.truncate_seq,
ceph_wbc.truncate_size,
......@@ -1575,7 +1655,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
do {
lock_page(page);
if ((off > size) || (page->mapping != inode->i_mapping)) {
if (page_mkwrite_check_truncate(page, inode) < 0) {
unlock_page(page);
ret = VM_FAULT_NOPAGE;
break;
......
......@@ -32,7 +32,7 @@ struct ceph_fscache_entry {
size_t uniq_len;
/* The following members must be last */
struct ceph_fsid fsid;
char uniquifier[0];
char uniquifier[];
};
static const struct fscache_cookie_def ceph_fscache_fsid_object_def = {
......
此差异已折叠。
......@@ -218,10 +218,10 @@ static int mds_sessions_show(struct seq_file *s, void *ptr)
return 0;
}
CEPH_DEFINE_SHOW_FUNC(mdsmap_show)
CEPH_DEFINE_SHOW_FUNC(mdsc_show)
CEPH_DEFINE_SHOW_FUNC(caps_show)
CEPH_DEFINE_SHOW_FUNC(mds_sessions_show)
DEFINE_SHOW_ATTRIBUTE(mdsmap);
DEFINE_SHOW_ATTRIBUTE(mdsc);
DEFINE_SHOW_ATTRIBUTE(caps);
DEFINE_SHOW_ATTRIBUTE(mds_sessions);
/*
......@@ -281,25 +281,25 @@ void ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
0400,
fsc->client->debugfs_dir,
fsc,
&mdsmap_show_fops);
&mdsmap_fops);
fsc->debugfs_mds_sessions = debugfs_create_file("mds_sessions",
0400,
fsc->client->debugfs_dir,
fsc,
&mds_sessions_show_fops);
&mds_sessions_fops);
fsc->debugfs_mdsc = debugfs_create_file("mdsc",
0400,
fsc->client->debugfs_dir,
fsc,
&mdsc_show_fops);
&mdsc_fops);
fsc->debugfs_caps = debugfs_create_file("caps",
0400,
fsc->client->debugfs_dir,
fsc,
&caps_show_fops);
&caps_fops);
}
......
......@@ -335,8 +335,11 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
ctx->pos = 2;
}
/* can we use the dcache? */
spin_lock(&ci->i_ceph_lock);
/* request Fx cap. if have Fx, we don't need to release Fs cap
* for later create/unlink. */
__ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_WR);
/* can we use the dcache? */
if (ceph_test_mount_opt(fsc, DCACHE) &&
!ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
ceph_snap(inode) != CEPH_SNAPDIR &&
......@@ -752,7 +755,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
struct ceph_dentry_info *di = ceph_dentry(dentry);
spin_lock(&ci->i_ceph_lock);
dout(" dir %p flags are %d\n", dir, ci->i_ceph_flags);
dout(" dir %p flags are 0x%lx\n", dir, ci->i_ceph_flags);
if (strncmp(dentry->d_name.name,
fsc->mount_options->snapdir_name,
dentry->d_name.len) &&
......@@ -760,6 +763,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
ceph_test_mount_opt(fsc, DCACHE) &&
__ceph_dir_is_complete(ci) &&
(__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) {
__ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_RD);
spin_unlock(&ci->i_ceph_lock);
dout(" dir %p complete, -ENOENT\n", dir);
d_add(dentry, NULL);
......@@ -1036,6 +1040,78 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
return err;
}
static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc,
struct ceph_mds_request *req)
{
int result = req->r_err ? req->r_err :
le32_to_cpu(req->r_reply_info.head->result);
if (result == -EJUKEBOX)
goto out;
/* If op failed, mark everyone involved for errors */
if (result) {
int pathlen;
u64 base;
char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
&base, 0);
/* mark error on parent + clear complete */
mapping_set_error(req->r_parent->i_mapping, result);
ceph_dir_clear_complete(req->r_parent);
/* drop the dentry -- we don't know its status */
if (!d_unhashed(req->r_dentry))
d_drop(req->r_dentry);
/* mark inode itself for an error (since metadata is bogus) */
mapping_set_error(req->r_old_inode->i_mapping, result);
pr_warn("ceph: async unlink failure path=(%llx)%s result=%d!\n",
base, IS_ERR(path) ? "<<bad>>" : path, result);
ceph_mdsc_free_path(path, pathlen);
}
out:
iput(req->r_old_inode);
ceph_mdsc_release_dir_caps(req);
}
static int get_caps_for_async_unlink(struct inode *dir, struct dentry *dentry)
{
struct ceph_inode_info *ci = ceph_inode(dir);
struct ceph_dentry_info *di;
int got = 0, want = CEPH_CAP_FILE_EXCL | CEPH_CAP_DIR_UNLINK;
spin_lock(&ci->i_ceph_lock);
if ((__ceph_caps_issued(ci, NULL) & want) == want) {
ceph_take_cap_refs(ci, want, false);
got = want;
}
spin_unlock(&ci->i_ceph_lock);
/* If we didn't get anything, return 0 */
if (!got)
return 0;
spin_lock(&dentry->d_lock);
di = ceph_dentry(dentry);
/*
* - We are holding Fx, which implies Fs caps.
* - Only support async unlink for primary linkage
*/
if (atomic_read(&ci->i_shared_gen) != di->lease_shared_gen ||
!(di->flags & CEPH_DENTRY_PRIMARY_LINK))
want = 0;
spin_unlock(&dentry->d_lock);
/* Do we still want what we've got? */
if (want == got)
return got;
ceph_put_cap_refs(ci, got);
return 0;
}
/*
* rmdir and unlink are differ only by the metadata op code
*/
......@@ -1045,6 +1121,7 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
struct ceph_mds_client *mdsc = fsc->mdsc;
struct inode *inode = d_inode(dentry);
struct ceph_mds_request *req;
bool try_async = ceph_test_mount_opt(fsc, ASYNC_DIROPS);
int err = -EROFS;
int op;
......@@ -1059,6 +1136,7 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK;
} else
goto out;
retry:
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
if (IS_ERR(req)) {
err = PTR_ERR(req);
......@@ -1067,13 +1145,39 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
req->r_dentry = dget(dentry);
req->r_num_caps = 2;
req->r_parent = dir;
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
req->r_inode_drop = ceph_drop_caps_for_unlink(inode);
err = ceph_mdsc_do_request(mdsc, dir, req);
if (!err && !req->r_reply_info.head->is_dentry)
d_delete(dentry);
if (try_async && op == CEPH_MDS_OP_UNLINK &&
(req->r_dir_caps = get_caps_for_async_unlink(dir, dentry))) {
dout("async unlink on %lu/%.*s caps=%s", dir->i_ino,
dentry->d_name.len, dentry->d_name.name,
ceph_cap_string(req->r_dir_caps));
set_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags);
req->r_callback = ceph_async_unlink_cb;
req->r_old_inode = d_inode(dentry);
ihold(req->r_old_inode);
err = ceph_mdsc_submit_request(mdsc, dir, req);
if (!err) {
/*
* We have enough caps, so we assume that the unlink
* will succeed. Fix up the target inode and dcache.
*/
drop_nlink(inode);
d_delete(dentry);
} else if (err == -EJUKEBOX) {
try_async = false;
ceph_mdsc_put_request(req);
goto retry;
}
} else {
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
err = ceph_mdsc_do_request(mdsc, dir, req);
if (!err && !req->r_reply_info.head->is_dentry)
d_delete(dentry);
}
ceph_mdsc_put_request(req);
out:
return err;
......@@ -1411,6 +1515,7 @@ void ceph_invalidate_dentry_lease(struct dentry *dentry)
spin_lock(&dentry->d_lock);
di->time = jiffies;
di->lease_shared_gen = 0;
di->flags &= ~CEPH_DENTRY_PRIMARY_LINK;
__dentry_lease_unlist(di);
spin_unlock(&dentry->d_lock);
}
......@@ -1520,7 +1625,8 @@ static int __dir_lease_try_check(const struct dentry *dentry)
/*
* Check if directory-wide content lease/cap is valid.
*/
static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry,
struct ceph_mds_client *mdsc)
{
struct ceph_inode_info *ci = ceph_inode(dir);
int valid;
......@@ -1528,7 +1634,10 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
spin_lock(&ci->i_ceph_lock);
valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1);
shared_gen = atomic_read(&ci->i_shared_gen);
if (valid) {
__ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_RD);
shared_gen = atomic_read(&ci->i_shared_gen);
}
spin_unlock(&ci->i_ceph_lock);
if (valid) {
struct ceph_dentry_info *di;
......@@ -1554,6 +1663,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
int valid = 0;
struct dentry *parent;
struct inode *dir, *inode;
struct ceph_mds_client *mdsc;
if (flags & LOOKUP_RCU) {
parent = READ_ONCE(dentry->d_parent);
......@@ -1570,6 +1680,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
dout("d_revalidate %p '%pd' inode %p offset 0x%llx\n", dentry,
dentry, inode, ceph_dentry(dentry)->offset);
mdsc = ceph_sb_to_client(dir->i_sb)->mdsc;
/* always trust cached snapped dentries, snapdir dentry */
if (ceph_snap(dir) != CEPH_NOSNAP) {
dout("d_revalidate %p '%pd' inode %p is SNAPPED\n", dentry,
......@@ -1581,7 +1693,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
valid = dentry_lease_is_valid(dentry, flags);
if (valid == -ECHILD)
return valid;
if (valid || dir_lease_is_valid(dir, dentry)) {
if (valid || dir_lease_is_valid(dir, dentry, mdsc)) {
if (inode)
valid = ceph_is_any_caps(inode);
else
......@@ -1590,8 +1702,6 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
}
if (!valid) {
struct ceph_mds_client *mdsc =
ceph_sb_to_client(dir->i_sb)->mdsc;
struct ceph_mds_request *req;
int op, err;
u32 mask;
......
......@@ -315,6 +315,11 @@ static struct dentry *__get_parent(struct super_block *sb,
req->r_num_caps = 1;
err = ceph_mdsc_do_request(mdsc, NULL, req);
if (err) {
ceph_mdsc_put_request(req);
return ERR_PTR(err);
}
inode = req->r_target_inode;
if (inode)
ihold(inode);
......
此差异已折叠。
......@@ -82,10 +82,14 @@ struct inode *ceph_get_snapdir(struct inode *parent)
inode->i_mode = parent->i_mode;
inode->i_uid = parent->i_uid;
inode->i_gid = parent->i_gid;
inode->i_mtime = parent->i_mtime;
inode->i_ctime = parent->i_ctime;
inode->i_atime = parent->i_atime;
inode->i_op = &ceph_snapdir_iops;
inode->i_fop = &ceph_snapdir_fops;
ci->i_snap_caps = CEPH_CAP_PIN; /* so we can open */
ci->i_rbytes = 0;
ci->i_btime = ceph_inode(parent)->i_btime;
if (inode->i_state & I_NEW)
unlock_new_inode(inode);
......@@ -447,6 +451,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ci->i_max_files = 0;
memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
RCU_INIT_POINTER(ci->i_layout.pool_ns, NULL);
ci->i_fragtree = RB_ROOT;
......@@ -471,13 +476,13 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ci->i_prealloc_cap_flush = NULL;
INIT_LIST_HEAD(&ci->i_cap_flush_list);
init_waitqueue_head(&ci->i_cap_wq);
ci->i_hold_caps_min = 0;
ci->i_hold_caps_max = 0;
INIT_LIST_HEAD(&ci->i_cap_delay_list);
INIT_LIST_HEAD(&ci->i_cap_snaps);
ci->i_head_snapc = NULL;
ci->i_snap_caps = 0;
ci->i_last_rd = ci->i_last_wr = jiffies - 3600 * HZ;
for (i = 0; i < CEPH_FILE_MODE_BITS; i++)
ci->i_nr_by_mode[i] = 0;
......@@ -496,6 +501,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ci->i_rdcache_ref = 0;
ci->i_wr_ref = 0;
ci->i_wb_ref = 0;
ci->i_fx_ref = 0;
ci->i_wrbuffer_ref = 0;
ci->i_wrbuffer_ref_head = 0;
atomic_set(&ci->i_filelock_ref, 0);
......@@ -586,6 +592,7 @@ void ceph_evict_inode(struct inode *inode)
ceph_buffer_put(ci->i_xattrs.prealloc_blob);
ceph_put_string(rcu_dereference_raw(ci->i_layout.pool_ns));
ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
}
static inline blkcnt_t calc_inode_blocks(u64 size)
......@@ -636,7 +643,7 @@ int ceph_fill_file_size(struct inode *inode, int issued,
if ((issued & (CEPH_CAP_FILE_CACHE|
CEPH_CAP_FILE_BUFFER)) ||
mapping_mapped(inode->i_mapping) ||
__ceph_caps_file_wanted(ci)) {
__ceph_is_file_opened(ci)) {
ci->i_truncate_pending++;
queue_trunc = 1;
}
......@@ -727,11 +734,11 @@ void ceph_fill_file_time(struct inode *inode, int issued,
* Populate an inode based on info from mds. May be called on new or
* existing inodes.
*/
static int fill_inode(struct inode *inode, struct page *locked_page,
struct ceph_mds_reply_info_in *iinfo,
struct ceph_mds_reply_dirfrag *dirinfo,
struct ceph_mds_session *session, int cap_fmode,
struct ceph_cap_reservation *caps_reservation)
int ceph_fill_inode(struct inode *inode, struct page *locked_page,
struct ceph_mds_reply_info_in *iinfo,
struct ceph_mds_reply_dirfrag *dirinfo,
struct ceph_mds_session *session, int cap_fmode,
struct ceph_cap_reservation *caps_reservation)
{
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
struct ceph_mds_reply_inode *info = iinfo->in;
......@@ -748,7 +755,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
bool new_version = false;
bool fill_inline = false;
dout("fill_inode %p ino %llx.%llx v %llu had %llu\n",
dout("%s %p ino %llx.%llx v %llu had %llu\n", __func__,
inode, ceph_vinop(inode), le64_to_cpu(info->version),
ci->i_version);
......@@ -769,7 +776,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
if (iinfo->xattr_len > 4) {
xattr_blob = ceph_buffer_new(iinfo->xattr_len, GFP_NOFS);
if (!xattr_blob)
pr_err("fill_inode ENOMEM xattr blob %d bytes\n",
pr_err("%s ENOMEM xattr blob %d bytes\n", __func__,
iinfo->xattr_len);
}
......@@ -932,8 +939,9 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
spin_unlock(&ci->i_ceph_lock);
if (symlen != i_size_read(inode)) {
pr_err("fill_inode %llx.%llx BAD symlink "
"size %lld\n", ceph_vinop(inode),
pr_err("%s %llx.%llx BAD symlink "
"size %lld\n", __func__,
ceph_vinop(inode),
i_size_read(inode));
i_size_write(inode, symlen);
inode->i_blocks = calc_inode_blocks(symlen);
......@@ -957,7 +965,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
inode->i_fop = &ceph_dir_fops;
break;
default:
pr_err("fill_inode %llx.%llx BAD mode 0%o\n",
pr_err("%s %llx.%llx BAD mode 0%o\n", __func__,
ceph_vinop(inode), inode->i_mode);
}
......@@ -966,7 +974,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
if (ceph_snap(inode) == CEPH_NOSNAP) {
ceph_add_cap(inode, session,
le64_to_cpu(info->cap.cap_id),
cap_fmode, info_caps,
info_caps,
le32_to_cpu(info->cap.wanted),
le32_to_cpu(info->cap.seq),
le32_to_cpu(info->cap.mseq),
......@@ -991,13 +999,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
dout(" %p got snap_caps %s\n", inode,
ceph_cap_string(info_caps));
ci->i_snap_caps |= info_caps;
if (cap_fmode >= 0)
__ceph_get_fmode(ci, cap_fmode);
}
} else if (cap_fmode >= 0) {
pr_warn("mds issued no caps on %llx.%llx\n",
ceph_vinop(inode));
__ceph_get_fmode(ci, cap_fmode);
}
if (iinfo->inline_version > 0 &&
......@@ -1009,6 +1011,13 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
fill_inline = true;
}
if (cap_fmode >= 0) {
if (!info_caps)
pr_warn("mds issued no caps on %llx.%llx\n",
ceph_vinop(inode));
__ceph_touch_fmode(ci, mdsc, cap_fmode);
}
spin_unlock(&ci->i_ceph_lock);
if (fill_inline)
......@@ -1050,6 +1059,7 @@ static void __update_dentry_lease(struct inode *dir, struct dentry *dentry,
struct ceph_mds_session **old_lease_session)
{
struct ceph_dentry_info *di = ceph_dentry(dentry);
unsigned mask = le16_to_cpu(lease->mask);
long unsigned duration = le32_to_cpu(lease->duration_ms);
long unsigned ttl = from_time + (duration * HZ) / 1000;
long unsigned half_ttl = from_time + (duration * HZ / 2) / 1000;
......@@ -1061,8 +1071,13 @@ static void __update_dentry_lease(struct inode *dir, struct dentry *dentry,
if (ceph_snap(dir) != CEPH_NOSNAP)
return;
if (mask & CEPH_LEASE_PRIMARY_LINK)
di->flags |= CEPH_DENTRY_PRIMARY_LINK;
else
di->flags &= ~CEPH_DENTRY_PRIMARY_LINK;
di->lease_shared_gen = atomic_read(&ceph_inode(dir)->i_shared_gen);
if (duration == 0) {
if (!(mask & CEPH_LEASE_VALID)) {
__ceph_dentry_dir_lease_touch(di);
return;
}
......@@ -1239,10 +1254,9 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
struct inode *dir = req->r_parent;
if (dir) {
err = fill_inode(dir, NULL,
&rinfo->diri, rinfo->dirfrag,
session, -1,
&req->r_caps_reservation);
err = ceph_fill_inode(dir, NULL, &rinfo->diri,
rinfo->dirfrag, session, -1,
&req->r_caps_reservation);
if (err < 0)
goto done;
} else {
......@@ -1307,13 +1321,14 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
goto done;
}
err = fill_inode(in, req->r_locked_page, &rinfo->targeti, NULL,
session,
err = ceph_fill_inode(in, req->r_locked_page, &rinfo->targeti,
NULL, session,
(!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags) &&
!test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) &&
rinfo->head->result == 0) ? req->r_fmode : -1,
&req->r_caps_reservation);
if (err < 0) {
pr_err("fill_inode badness %p %llx.%llx\n",
pr_err("ceph_fill_inode badness %p %llx.%llx\n",
in, ceph_vinop(in));
if (in->i_state & I_NEW)
discard_new_inode(in);
......@@ -1500,10 +1515,11 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
dout("new_inode badness got %d\n", err);
continue;
}
rc = fill_inode(in, NULL, &rde->inode, NULL, session,
-1, &req->r_caps_reservation);
rc = ceph_fill_inode(in, NULL, &rde->inode, NULL, session,
-1, &req->r_caps_reservation);
if (rc < 0) {
pr_err("fill_inode badness on %p got %d\n", in, rc);
pr_err("ceph_fill_inode badness on %p got %d\n",
in, rc);
err = rc;
if (in->i_state & I_NEW) {
ihold(in);
......@@ -1707,10 +1723,10 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
}
}
ret = fill_inode(in, NULL, &rde->inode, NULL, session,
-1, &req->r_caps_reservation);
ret = ceph_fill_inode(in, NULL, &rde->inode, NULL, session,
-1, &req->r_caps_reservation);
if (ret < 0) {
pr_err("fill_inode badness on %p\n", in);
pr_err("ceph_fill_inode badness on %p\n", in);
if (d_really_is_negative(dn)) {
/* avoid calling iput_final() in mds
* dispatch threads */
......@@ -1972,7 +1988,7 @@ void __ceph_do_pending_vmtruncate(struct inode *inode)
mutex_unlock(&ci->i_truncate_mutex);
if (wrbuffer_refs == 0)
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
ceph_check_caps(ci, 0, NULL);
wake_up_all(&ci->i_cap_wq);
}
......
......@@ -243,11 +243,13 @@ static long ceph_ioctl_lazyio(struct file *file)
struct ceph_file_info *fi = file->private_data;
struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
if ((fi->fmode & CEPH_FILE_MODE_LAZY) == 0) {
spin_lock(&ci->i_ceph_lock);
fi->fmode |= CEPH_FILE_MODE_LAZY;
ci->i_nr_by_mode[ffs(CEPH_FILE_MODE_LAZY)]++;
__ceph_touch_fmode(ci, mdsc, fi->fmode);
spin_unlock(&ci->i_ceph_lock);
dout("ioctl_layzio: file %p marked lazy\n", file);
......
......@@ -210,6 +210,21 @@ static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc,
return 0;
}
static int try_unlock_file(struct file *file, struct file_lock *fl)
{
int err;
unsigned int orig_flags = fl->fl_flags;
fl->fl_flags |= FL_EXISTS;
err = locks_lock_file_wait(file, fl);
fl->fl_flags = orig_flags;
if (err == -ENOENT) {
if (!(orig_flags & FL_EXISTS))
err = 0;
return err;
}
return 1;
}
/**
* Attempt to set an fcntl lock.
* For now, this just goes away to the server. Later it may be more awesome.
......@@ -255,9 +270,15 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
else
lock_cmd = CEPH_LOCK_UNLOCK;
if (op == CEPH_MDS_OP_SETFILELOCK && F_UNLCK == fl->fl_type) {
err = try_unlock_file(file, fl);
if (err <= 0)
return err;
}
err = ceph_lock_message(CEPH_LOCK_FCNTL, op, inode, lock_cmd, wait, fl);
if (!err) {
if (op == CEPH_MDS_OP_SETFILELOCK) {
if (op == CEPH_MDS_OP_SETFILELOCK && F_UNLCK != fl->fl_type) {
dout("mds locked, locking locally\n");
err = posix_lock_file(file, fl, NULL);
if (err) {
......@@ -311,9 +332,15 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
else
lock_cmd = CEPH_LOCK_UNLOCK;
if (F_UNLCK == fl->fl_type) {
err = try_unlock_file(file, fl);
if (err <= 0)
return err;
}
err = ceph_lock_message(CEPH_LOCK_FLOCK, CEPH_MDS_OP_SETFILELOCK,
inode, lock_cmd, wait, fl);
if (!err) {
if (!err && F_UNLCK != fl->fl_type) {
err = locks_lock_file_wait(file, fl);
if (err) {
ceph_lock_message(CEPH_LOCK_FLOCK,
......
......@@ -415,21 +415,121 @@ static int parse_reply_info_filelock(void **p, void *end,
return -EIO;
}
#if BITS_PER_LONG == 64
#define DELEGATED_INO_AVAILABLE xa_mk_value(1)
static int ceph_parse_deleg_inos(void **p, void *end,
struct ceph_mds_session *s)
{
u32 sets;
ceph_decode_32_safe(p, end, sets, bad);
dout("got %u sets of delegated inodes\n", sets);
while (sets--) {
u64 start, len, ino;
ceph_decode_64_safe(p, end, start, bad);
ceph_decode_64_safe(p, end, len, bad);
while (len--) {
int err = xa_insert(&s->s_delegated_inos, ino = start++,
DELEGATED_INO_AVAILABLE,
GFP_KERNEL);
if (!err) {
dout("added delegated inode 0x%llx\n",
start - 1);
} else if (err == -EBUSY) {
pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n",
start - 1);
} else {
return err;
}
}
}
return 0;
bad:
return -EIO;
}
u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
{
unsigned long ino;
void *val;
xa_for_each(&s->s_delegated_inos, ino, val) {
val = xa_erase(&s->s_delegated_inos, ino);
if (val == DELEGATED_INO_AVAILABLE)
return ino;
}
return 0;
}
int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
{
return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
GFP_KERNEL);
}
#else /* BITS_PER_LONG == 64 */
/*
* FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
* ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
* and bottom words?
*/
static int ceph_parse_deleg_inos(void **p, void *end,
struct ceph_mds_session *s)
{
u32 sets;
ceph_decode_32_safe(p, end, sets, bad);
if (sets)
ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
return 0;
bad:
return -EIO;
}
u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
{
return 0;
}
int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
{
return 0;
}
#endif /* BITS_PER_LONG == 64 */
/*
* parse create results
*/
static int parse_reply_info_create(void **p, void *end,
struct ceph_mds_reply_info_parsed *info,
u64 features)
u64 features, struct ceph_mds_session *s)
{
int ret;
if (features == (u64)-1 ||
(features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
/* Malformed reply? */
if (*p == end) {
/* Malformed reply? */
info->has_create_ino = false;
} else {
} else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
u8 struct_v, struct_compat;
u32 len;
info->has_create_ino = true;
ceph_decode_8_safe(p, end, struct_v, bad);
ceph_decode_8_safe(p, end, struct_compat, bad);
ceph_decode_32_safe(p, end, len, bad);
ceph_decode_64_safe(p, end, info->ino, bad);
ret = ceph_parse_deleg_inos(p, end, s);
if (ret)
return ret;
} else {
/* legacy */
ceph_decode_64_safe(p, end, info->ino, bad);
info->has_create_ino = true;
}
} else {
if (*p != end)
......@@ -448,7 +548,7 @@ static int parse_reply_info_create(void **p, void *end,
*/
static int parse_reply_info_extra(void **p, void *end,
struct ceph_mds_reply_info_parsed *info,
u64 features)
u64 features, struct ceph_mds_session *s)
{
u32 op = le32_to_cpu(info->head->op);
......@@ -457,7 +557,7 @@ static int parse_reply_info_extra(void **p, void *end,
else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
return parse_reply_info_readdir(p, end, info, features);
else if (op == CEPH_MDS_OP_CREATE)
return parse_reply_info_create(p, end, info, features);
return parse_reply_info_create(p, end, info, features, s);
else
return -EIO;
}
......@@ -465,7 +565,7 @@ static int parse_reply_info_extra(void **p, void *end,
/*
* parse entire mds reply
*/
static int parse_reply_info(struct ceph_msg *msg,
static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
struct ceph_mds_reply_info_parsed *info,
u64 features)
{
......@@ -490,7 +590,7 @@ static int parse_reply_info(struct ceph_msg *msg,
ceph_decode_32_safe(&p, end, len, bad);
if (len > 0) {
ceph_decode_need(&p, end, len, bad);
err = parse_reply_info_extra(&p, p+len, info, features);
err = parse_reply_info_extra(&p, p+len, info, features, s);
if (err < 0)
goto out_bad;
}
......@@ -558,6 +658,7 @@ void ceph_put_mds_session(struct ceph_mds_session *s)
if (refcount_dec_and_test(&s->s_ref)) {
if (s->s_auth.authorizer)
ceph_auth_destroy_authorizer(s->s_auth.authorizer);
xa_destroy(&s->s_delegated_inos);
kfree(s);
}
}
......@@ -645,6 +746,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
refcount_set(&s->s_ref, 1);
INIT_LIST_HEAD(&s->s_waiting);
INIT_LIST_HEAD(&s->s_unsafe);
xa_init(&s->s_delegated_inos);
s->s_num_cap_releases = 0;
s->s_cap_reconnect = 0;
s->s_cap_iterator = NULL;
......@@ -699,6 +801,7 @@ void ceph_mdsc_release_request(struct kref *kref)
struct ceph_mds_request *req = container_of(kref,
struct ceph_mds_request,
r_kref);
ceph_mdsc_release_dir_caps(req);
destroy_reply_info(&req->r_reply_info);
if (req->r_request)
ceph_msg_put(req->r_request);
......@@ -736,7 +839,7 @@ void ceph_mdsc_release_request(struct kref *kref)
put_request_session(req);
ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
WARN_ON_ONCE(!list_empty(&req->r_wait));
kfree(req);
kmem_cache_free(ceph_mds_request_cachep, req);
}
DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
......@@ -793,8 +896,13 @@ static void __register_request(struct ceph_mds_client *mdsc,
mdsc->oldest_tid = req->r_tid;
if (dir) {
struct ceph_inode_info *ci = ceph_inode(dir);
ihold(dir);
req->r_unsafe_dir = dir;
spin_lock(&ci->i_unsafe_lock);
list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
spin_unlock(&ci->i_unsafe_lock);
}
}
......@@ -822,8 +930,7 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
erase_request(&mdsc->request_tree, req);
if (req->r_unsafe_dir &&
test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
if (req->r_unsafe_dir) {
struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
spin_lock(&ci->i_unsafe_lock);
list_del_init(&req->r_unsafe_dir_item);
......@@ -1407,8 +1514,6 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
dout("removing cap %p, ci is %p, inode is %p\n",
cap, ci, &ci->vfs_inode);
spin_lock(&ci->i_ceph_lock);
if (cap->mds_wanted | cap->issued)
ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
__ceph_remove_cap(cap, false);
if (!ci->i_auth_cap) {
struct ceph_cap_flush *cf;
......@@ -1574,9 +1679,6 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
/* mds did not re-issue stale cap */
spin_lock(&ci->i_ceph_lock);
cap->issued = cap->implemented = CEPH_CAP_PIN;
/* make sure mds knows what we want */
if (__ceph_caps_file_wanted(ci) & ~cap->mds_wanted)
ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
spin_unlock(&ci->i_ceph_lock);
}
} else if (ev == FORCE_RO) {
......@@ -1772,7 +1874,8 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
}
/* The inode has cached pages, but it's no longer used.
* we can safely drop it */
if (wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
if (S_ISREG(inode->i_mode) &&
wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
!(oissued & CEPH_CAP_FILE_CACHE)) {
used = 0;
oissued = 0;
......@@ -2089,8 +2192,9 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
struct ceph_mds_request *
ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
{
struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
struct ceph_mds_request *req;
req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
if (!req)
return ERR_PTR(-ENOMEM);
......@@ -2368,7 +2472,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
head->op = cpu_to_le32(req->r_op);
head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
head->ino = 0;
head->ino = cpu_to_le64(req->r_deleg_ino);
head->args = req->r_args;
ceph_encode_filepath(&p, end, ino1, path1);
......@@ -2382,7 +2486,8 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
if (req->r_inode_drop)
releases += ceph_encode_inode_release(&p,
req->r_inode ? req->r_inode : d_inode(req->r_dentry),
mds, req->r_inode_drop, req->r_inode_unless, 0);
mds, req->r_inode_drop, req->r_inode_unless,
req->r_op == CEPH_MDS_OP_READDIR);
if (req->r_dentry_drop)
releases += ceph_encode_dentry_release(&p, req->r_dentry,
req->r_parent, mds, req->r_dentry_drop,
......@@ -2522,12 +2627,13 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
flags |= CEPH_MDS_FLAG_REPLAY;
if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
flags |= CEPH_MDS_FLAG_ASYNC;
if (req->r_parent)
flags |= CEPH_MDS_FLAG_WANT_DENTRY;
rhead->flags = cpu_to_le32(flags);
rhead->num_fwd = req->r_num_fwd;
rhead->num_retry = req->r_attempts - 1;
rhead->ino = 0;
dout(" r_parent = %p\n", req->r_parent);
return 0;
......@@ -2573,7 +2679,7 @@ static void __do_request(struct ceph_mds_client *mdsc,
if (req->r_timeout &&
time_after_eq(jiffies, req->r_started + req->r_timeout)) {
dout("do_request timed out\n");
err = -EIO;
err = -ETIMEDOUT;
goto finish;
}
if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
......@@ -2605,6 +2711,10 @@ static void __do_request(struct ceph_mds_client *mdsc,
mds = __choose_mds(mdsc, req, &random);
if (mds < 0 ||
ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
err = -EJUKEBOX;
goto finish;
}
dout("do_request no mds or not active, waiting for map\n");
list_add(&req->r_wait, &mdsc->waiting_for_map);
return;
......@@ -2629,6 +2739,15 @@ static void __do_request(struct ceph_mds_client *mdsc,
err = -EACCES;
goto out_session;
}
/*
* We cannot queue async requests since the caps and delegated
* inodes are bound to the session. Just return -EJUKEBOX and
* let the caller retry a sync request in that case.
*/
if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
err = -EJUKEBOX;
goto out_session;
}
if (session->s_state == CEPH_MDS_SESSION_NEW ||
session->s_state == CEPH_MDS_SESSION_CLOSING) {
__open_session(mdsc, session);
......@@ -2709,19 +2828,43 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds)
int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
struct ceph_mds_request *req)
{
int err;
int err = 0;
/* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
if (req->r_inode)
ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
if (req->r_parent) {
ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
struct ceph_inode_info *ci = ceph_inode(req->r_parent);
int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
spin_lock(&ci->i_ceph_lock);
ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
__ceph_touch_fmode(ci, mdsc, fmode);
spin_unlock(&ci->i_ceph_lock);
ihold(req->r_parent);
}
if (req->r_old_dentry_dir)
ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN);
if (req->r_inode) {
err = ceph_wait_on_async_create(req->r_inode);
if (err) {
dout("%s: wait for async create returned: %d\n",
__func__, err);
return err;
}
}
if (!err && req->r_old_inode) {
err = ceph_wait_on_async_create(req->r_old_inode);
if (err) {
dout("%s: wait for async create returned: %d\n",
__func__, err);
return err;
}
}
dout("submit_request on %p for inode %p\n", req, dir);
mutex_lock(&mdsc->mutex);
__register_request(mdsc, req, dir);
......@@ -2747,7 +2890,7 @@ static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
if (timeleft > 0)
err = 0;
else if (!timeleft)
err = -EIO; /* timed out */
err = -ETIMEDOUT; /* timed out */
else
err = timeleft; /* killed */
}
......@@ -2935,22 +3078,14 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
} else {
set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
if (req->r_unsafe_dir) {
struct ceph_inode_info *ci =
ceph_inode(req->r_unsafe_dir);
spin_lock(&ci->i_unsafe_lock);
list_add_tail(&req->r_unsafe_dir_item,
&ci->i_unsafe_dirops);
spin_unlock(&ci->i_unsafe_lock);
}
}
dout("handle_reply tid %lld result %d\n", tid, result);
rinfo = &req->r_reply_info;
if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
err = parse_reply_info(msg, rinfo, (u64)-1);
err = parse_reply_info(session, msg, rinfo, (u64)-1);
else
err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features);
mutex_unlock(&mdsc->mutex);
mutex_lock(&session->s_mutex);
......@@ -3249,6 +3384,17 @@ static void handle_session(struct ceph_mds_session *session,
return;
}
void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
{
int dcaps;
dcaps = xchg(&req->r_dir_caps, 0);
if (dcaps) {
dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
}
}
/*
* called under session->mutex.
*/
......@@ -3276,9 +3422,14 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
continue;
if (req->r_attempts == 0)
continue; /* only old requests */
if (req->r_session &&
req->r_session->s_mds == session->s_mds)
__send_request(mdsc, session, req, true);
if (!req->r_session)
continue;
if (req->r_session->s_mds != session->s_mds)
continue;
ceph_mdsc_release_dir_caps(req);
__send_request(mdsc, session, req, true);
}
mutex_unlock(&mdsc->mutex);
}
......@@ -3362,7 +3513,7 @@ static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
/*
* Encode information about a cap for a reconnect with the MDS.
*/
static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
void *arg)
{
union {
......@@ -3385,6 +3536,15 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
cap->mseq = 0; /* and migrate_seq */
cap->cap_gen = cap->session->s_cap_gen;
/* These are lost when the session goes away */
if (S_ISDIR(inode->i_mode)) {
if (cap->issued & CEPH_CAP_DIR_CREATE) {
ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
}
cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
}
if (recon_state->msg_version >= 2) {
rec.v2.cap_id = cpu_to_le64(cap->cap_id);
rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
......@@ -3626,6 +3786,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
if (!reply)
goto fail_nomsg;
xa_destroy(&session->s_delegated_inos);
mutex_lock(&session->s_mutex);
session->s_state = CEPH_MDS_SESSION_RECONNECTING;
session->s_seq = 0;
......@@ -3681,7 +3843,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
recon_state.msg_version = 2;
}
/* trsaverse this session's caps */
err = ceph_iterate_session_caps(session, encode_caps_cb, &recon_state);
err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
spin_lock(&session->s_cap_lock);
session->s_cap_reconnect = 0;
......
......@@ -23,8 +23,9 @@ enum ceph_feature_type {
CEPHFS_FEATURE_RECLAIM_CLIENT,
CEPHFS_FEATURE_LAZY_CAP_WANTED,
CEPHFS_FEATURE_MULTI_RECONNECT,
CEPHFS_FEATURE_DELEG_INO,
CEPHFS_FEATURE_MAX = CEPHFS_FEATURE_MULTI_RECONNECT,
CEPHFS_FEATURE_MAX = CEPHFS_FEATURE_DELEG_INO,
};
/*
......@@ -37,6 +38,7 @@ enum ceph_feature_type {
CEPHFS_FEATURE_REPLY_ENCODING, \
CEPHFS_FEATURE_LAZY_CAP_WANTED, \
CEPHFS_FEATURE_MULTI_RECONNECT, \
CEPHFS_FEATURE_DELEG_INO, \
\
CEPHFS_FEATURE_MAX, \
}
......@@ -201,6 +203,7 @@ struct ceph_mds_session {
struct list_head s_waiting; /* waiting requests */
struct list_head s_unsafe; /* unsafe requests */
struct xarray s_delegated_inos;
};
/*
......@@ -255,6 +258,7 @@ struct ceph_mds_request {
#define CEPH_MDS_R_GOT_RESULT (5) /* got a result */
#define CEPH_MDS_R_DID_PREPOPULATE (6) /* prepopulated readdir */
#define CEPH_MDS_R_PARENT_LOCKED (7) /* is r_parent->i_rwsem wlocked? */
#define CEPH_MDS_R_ASYNC (8) /* async request */
unsigned long r_req_flags;
struct mutex r_fill_mutex;
......@@ -263,6 +267,7 @@ struct ceph_mds_request {
int r_fmode; /* file mode, if expecting cap */
kuid_t r_uid;
kgid_t r_gid;
int r_request_release_offset;
struct timespec64 r_stamp;
/* for choosing which mds to send this request to */
......@@ -280,12 +285,16 @@ struct ceph_mds_request {
int r_old_inode_drop, r_old_inode_unless;
struct ceph_msg *r_request; /* original request */
int r_request_release_offset;
struct ceph_msg *r_reply;
struct ceph_mds_reply_info_parsed r_reply_info;
struct page *r_locked_page;
int r_err;
struct page *r_locked_page;
int r_dir_caps;
int r_num_caps;
u32 r_readdir_offset;
unsigned long r_timeout; /* optional. jiffies, 0 is "wait forever" */
unsigned long r_started; /* start time to measure timeout against */
unsigned long r_request_started; /* start time for mds request only,
......@@ -304,6 +313,7 @@ struct ceph_mds_request {
int r_num_fwd; /* number of forward attempts */
int r_resend_mds; /* mds to resend to next, if any*/
u32 r_sent_on_mseq; /* cap mseq request was sent at*/
u64 r_deleg_ino;
struct list_head r_wait;
struct completion r_completion;
......@@ -315,10 +325,8 @@ struct ceph_mds_request {
long long r_dir_release_cnt;
long long r_dir_ordered_cnt;
int r_readdir_cache_idx;
u32 r_readdir_offset;
struct ceph_cap_reservation r_caps_reservation;
int r_num_caps;
};
struct ceph_pool_perm {
......@@ -488,6 +496,7 @@ extern int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
struct inode *dir,
struct ceph_mds_request *req);
extern void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req);
static inline void ceph_mdsc_get_request(struct ceph_mds_request *req)
{
kref_get(&req->r_kref);
......@@ -537,4 +546,15 @@ extern void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
extern int ceph_trim_caps(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session,
int max_caps);
static inline int ceph_wait_on_async_create(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
return wait_on_bit(&ci->i_ceph_flags, CEPH_ASYNC_CREATE_BIT,
TASK_INTERRUPTIBLE);
}
extern u64 ceph_get_deleg_ino(struct ceph_mds_session *session);
extern int ceph_restore_deleg_ino(struct ceph_mds_session *session, u64 ino);
#endif
......@@ -155,6 +155,7 @@ enum {
Opt_acl,
Opt_quotadf,
Opt_copyfrom,
Opt_wsync,
};
enum ceph_recover_session_mode {
......@@ -194,6 +195,7 @@ static const struct fs_parameter_spec ceph_mount_parameters[] = {
fsparam_string ("snapdirname", Opt_snapdirname),
fsparam_string ("source", Opt_source),
fsparam_u32 ("wsize", Opt_wsize),
fsparam_flag_no ("wsync", Opt_wsync),
{}
};
......@@ -444,6 +446,12 @@ static int ceph_parse_mount_param(struct fs_context *fc,
fc->sb_flags &= ~SB_POSIXACL;
}
break;
case Opt_wsync:
if (!result.negated)
fsopt->flags &= ~CEPH_MOUNT_OPT_ASYNC_DIROPS;
else
fsopt->flags |= CEPH_MOUNT_OPT_ASYNC_DIROPS;
break;
default:
BUG();
}
......@@ -567,6 +575,9 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
if (fsopt->flags & CEPH_MOUNT_OPT_CLEANRECOVER)
seq_show_option(m, "recover_session", "clean");
if (fsopt->flags & CEPH_MOUNT_OPT_ASYNC_DIROPS)
seq_puts(m, ",nowsync");
if (fsopt->wsize != CEPH_MAX_WRITE_SIZE)
seq_printf(m, ",wsize=%u", fsopt->wsize);
if (fsopt->rsize != CEPH_MAX_READ_SIZE)
......@@ -729,6 +740,7 @@ struct kmem_cache *ceph_cap_flush_cachep;
struct kmem_cache *ceph_dentry_cachep;
struct kmem_cache *ceph_file_cachep;
struct kmem_cache *ceph_dir_file_cachep;
struct kmem_cache *ceph_mds_request_cachep;
static void ceph_inode_init_once(void *foo)
{
......@@ -769,6 +781,10 @@ static int __init init_caches(void)
if (!ceph_dir_file_cachep)
goto bad_dir_file;
ceph_mds_request_cachep = KMEM_CACHE(ceph_mds_request, SLAB_MEM_SPREAD);
if (!ceph_mds_request_cachep)
goto bad_mds_req;
error = ceph_fscache_register();
if (error)
goto bad_fscache;
......@@ -776,6 +792,8 @@ static int __init init_caches(void)
return 0;
bad_fscache:
kmem_cache_destroy(ceph_mds_request_cachep);
bad_mds_req:
kmem_cache_destroy(ceph_dir_file_cachep);
bad_dir_file:
kmem_cache_destroy(ceph_file_cachep);
......@@ -804,6 +822,7 @@ static void destroy_caches(void)
kmem_cache_destroy(ceph_dentry_cachep);
kmem_cache_destroy(ceph_file_cachep);
kmem_cache_destroy(ceph_dir_file_cachep);
kmem_cache_destroy(ceph_mds_request_cachep);
ceph_fscache_unregister();
}
......@@ -1107,6 +1126,15 @@ static void ceph_free_fc(struct fs_context *fc)
static int ceph_reconfigure_fc(struct fs_context *fc)
{
struct ceph_parse_opts_ctx *pctx = fc->fs_private;
struct ceph_mount_options *fsopt = pctx->opts;
struct ceph_fs_client *fsc = ceph_sb_to_client(fc->root->d_sb);
if (fsopt->flags & CEPH_MOUNT_OPT_ASYNC_DIROPS)
ceph_set_mount_opt(fsc, ASYNC_DIROPS);
else
ceph_clear_mount_opt(fsc, ASYNC_DIROPS);
sync_filesystem(fc->root->d_sb);
return 0;
}
......
......@@ -43,13 +43,16 @@
#define CEPH_MOUNT_OPT_MOUNTWAIT (1<<12) /* mount waits if no mds is up */
#define CEPH_MOUNT_OPT_NOQUOTADF (1<<13) /* no root dir quota in statfs */
#define CEPH_MOUNT_OPT_NOCOPYFROM (1<<14) /* don't use RADOS 'copy-from' op */
#define CEPH_MOUNT_OPT_ASYNC_DIROPS (1<<15) /* allow async directory ops */
#define CEPH_MOUNT_OPT_DEFAULT \
(CEPH_MOUNT_OPT_DCACHE | \
CEPH_MOUNT_OPT_NOCOPYFROM)
#define ceph_set_mount_opt(fsc, opt) \
(fsc)->mount_options->flags |= CEPH_MOUNT_OPT_##opt;
(fsc)->mount_options->flags |= CEPH_MOUNT_OPT_##opt
#define ceph_clear_mount_opt(fsc, opt) \
(fsc)->mount_options->flags &= ~CEPH_MOUNT_OPT_##opt
#define ceph_test_mount_opt(fsc, opt) \
(!!((fsc)->mount_options->flags & CEPH_MOUNT_OPT_##opt))
......@@ -170,9 +173,9 @@ struct ceph_cap {
struct list_head caps_item;
};
#define CHECK_CAPS_NODELAY 1 /* do not delay any further */
#define CHECK_CAPS_AUTHONLY 2 /* only check auth cap */
#define CHECK_CAPS_FLUSH 4 /* flush any dirty caps */
#define CHECK_CAPS_AUTHONLY 1 /* only check auth cap */
#define CHECK_CAPS_FLUSH 2 /* flush any dirty caps */
#define CHECK_CAPS_NOINVAL 4 /* don't invalidate pagecache */
struct ceph_cap_flush {
u64 tid;
......@@ -284,6 +287,7 @@ struct ceph_dentry_info {
#define CEPH_DENTRY_REFERENCED 1
#define CEPH_DENTRY_LEASE_LIST 2
#define CEPH_DENTRY_SHRINK_LIST 4
#define CEPH_DENTRY_PRIMARY_LINK 8
struct ceph_inode_xattrs_info {
/*
......@@ -315,13 +319,14 @@ struct ceph_inode_info {
u64 i_inline_version;
u32 i_time_warp_seq;
unsigned i_ceph_flags;
unsigned long i_ceph_flags;
atomic64_t i_release_count;
atomic64_t i_ordered_count;
atomic64_t i_complete_seq[2];
struct ceph_dir_layout i_dir_layout;
struct ceph_file_layout i_layout;
struct ceph_file_layout i_cached_layout; // for async creates
char *i_symlink;
/* for dirs */
......@@ -352,7 +357,6 @@ struct ceph_inode_info {
struct ceph_cap_flush *i_prealloc_cap_flush;
struct list_head i_cap_flush_list;
wait_queue_head_t i_cap_wq; /* threads waiting on a capability */
unsigned long i_hold_caps_min; /* jiffies */
unsigned long i_hold_caps_max; /* jiffies */
struct list_head i_cap_delay_list; /* for delayed cap release to mds */
struct ceph_cap_reservation i_cap_migration_resv;
......@@ -361,6 +365,8 @@ struct ceph_inode_info {
dirty|flushing caps */
unsigned i_snap_caps; /* cap bits for snapped files */
unsigned long i_last_rd;
unsigned long i_last_wr;
int i_nr_by_mode[CEPH_FILE_MODE_BITS]; /* open file counts */
struct mutex i_truncate_mutex;
......@@ -375,7 +381,7 @@ struct ceph_inode_info {
/* held references to caps */
int i_pin_ref;
int i_rd_ref, i_rdcache_ref, i_wr_ref, i_wb_ref;
int i_rd_ref, i_rdcache_ref, i_wr_ref, i_wb_ref, i_fx_ref;
int i_wrbuffer_ref, i_wrbuffer_ref_head;
atomic_t i_filelock_ref;
atomic_t i_shared_gen; /* increment each time we get FILE_SHARED */
......@@ -511,18 +517,18 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
* Ceph inode.
*/
#define CEPH_I_DIR_ORDERED (1 << 0) /* dentries in dir are ordered */
#define CEPH_I_NODELAY (1 << 1) /* do not delay cap release */
#define CEPH_I_FLUSH (1 << 2) /* do not delay flush of dirty metadata */
#define CEPH_I_POOL_PERM (1 << 3) /* pool rd/wr bits are valid */
#define CEPH_I_POOL_RD (1 << 4) /* can read from pool */
#define CEPH_I_POOL_WR (1 << 5) /* can write to pool */
#define CEPH_I_SEC_INITED (1 << 6) /* security initialized */
#define CEPH_I_CAP_DROPPED (1 << 7) /* caps were forcibly dropped */
#define CEPH_I_KICK_FLUSH (1 << 8) /* kick flushing caps */
#define CEPH_I_FLUSH_SNAPS (1 << 9) /* need flush snapss */
#define CEPH_I_ERROR_WRITE (1 << 10) /* have seen write errors */
#define CEPH_I_ERROR_FILELOCK (1 << 11) /* have seen file lock errors */
#define CEPH_I_ODIRECT (1 << 12) /* inode in direct I/O mode */
#define CEPH_I_KICK_FLUSH (1 << 7) /* kick flushing caps */
#define CEPH_I_FLUSH_SNAPS (1 << 8) /* need flush snapss */
#define CEPH_I_ERROR_WRITE (1 << 9) /* have seen write errors */
#define CEPH_I_ERROR_FILELOCK (1 << 10) /* have seen file lock errors */
#define CEPH_I_ODIRECT (1 << 11) /* inode in direct I/O mode */
#define CEPH_ASYNC_CREATE_BIT (12) /* async create in flight for this */
#define CEPH_I_ASYNC_CREATE (1 << CEPH_ASYNC_CREATE_BIT)
/*
* Masks of ceph inode work.
......@@ -674,18 +680,12 @@ extern int __ceph_caps_revoking_other(struct ceph_inode_info *ci,
extern int ceph_caps_revoking(struct ceph_inode_info *ci, int mask);
extern int __ceph_caps_used(struct ceph_inode_info *ci);
extern int __ceph_caps_file_wanted(struct ceph_inode_info *ci);
/*
* wanted, by virtue of open file modes AND cap refs (buffered/cached data)
*/
static inline int __ceph_caps_wanted(struct ceph_inode_info *ci)
static inline bool __ceph_is_file_opened(struct ceph_inode_info *ci)
{
int w = __ceph_caps_file_wanted(ci) | __ceph_caps_used(ci);
if (w & CEPH_CAP_FILE_BUFFER)
w |= CEPH_CAP_FILE_EXCL; /* we want EXCL if dirty data */
return w;
return ci->i_nr_by_mode[0];
}
extern int __ceph_caps_file_wanted(struct ceph_inode_info *ci);
extern int __ceph_caps_wanted(struct ceph_inode_info *ci);
/* what the mds thinks we want */
extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check);
......@@ -899,6 +899,9 @@ static inline bool __ceph_have_pending_cap_snap(struct ceph_inode_info *ci)
}
/* inode.c */
struct ceph_mds_reply_info_in;
struct ceph_mds_reply_dirfrag;
extern const struct inode_operations ceph_file_iops;
extern struct inode *ceph_alloc_inode(struct super_block *sb);
......@@ -914,6 +917,11 @@ extern void ceph_fill_file_time(struct inode *inode, int issued,
u64 time_warp_seq, struct timespec64 *ctime,
struct timespec64 *mtime,
struct timespec64 *atime);
extern int ceph_fill_inode(struct inode *inode, struct page *locked_page,
struct ceph_mds_reply_info_in *iinfo,
struct ceph_mds_reply_dirfrag *dirinfo,
struct ceph_mds_session *session, int cap_fmode,
struct ceph_cap_reservation *caps_reservation);
extern int ceph_fill_trace(struct super_block *sb,
struct ceph_mds_request *req);
extern int ceph_readdir_prepopulate(struct ceph_mds_request *req,
......@@ -1042,7 +1050,7 @@ extern struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
struct ceph_cap_reservation *ctx);
extern void ceph_add_cap(struct inode *inode,
struct ceph_mds_session *session, u64 cap_id,
int fmode, unsigned issued, unsigned wanted,
unsigned issued, unsigned wanted,
unsigned cap, unsigned seq, u64 realmino, int flags,
struct ceph_cap **new_cap);
extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
......@@ -1058,8 +1066,12 @@ extern void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session);
extern void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session);
void ceph_kick_flushing_inode_caps(struct ceph_mds_session *session,
struct ceph_inode_info *ci);
extern struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci,
int mds);
extern void ceph_take_cap_refs(struct ceph_inode_info *ci, int caps,
bool snap_rwsem_locked);
extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps);
extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
......@@ -1084,8 +1096,10 @@ extern int ceph_try_get_caps(struct inode *inode,
int need, int want, bool nonblock, int *got);
/* for counting open files by mode */
extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode);
extern void ceph_put_fmode(struct ceph_inode_info *ci, int mode);
extern void ceph_get_fmode(struct ceph_inode_info *ci, int mode, int count);
extern void ceph_put_fmode(struct ceph_inode_info *ci, int mode, int count);
extern void __ceph_touch_fmode(struct ceph_inode_info *ci,
struct ceph_mds_client *mdsc, int fmode);
/* addr.c */
extern const struct address_space_operations ceph_aops;
......@@ -1097,7 +1111,7 @@ extern void ceph_pool_perm_destroy(struct ceph_mds_client* mdsc);
/* file.c */
extern const struct file_operations ceph_file_fops;
extern int ceph_renew_caps(struct inode *inode);
extern int ceph_renew_caps(struct inode *inode, int fmode);
extern int ceph_open(struct inode *inode, struct file *file);
extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
struct file *file, unsigned flags, umode_t mode);
......
......@@ -444,8 +444,9 @@ union ceph_mds_request_args {
} __attribute__ ((packed)) lookupino;
} __attribute__ ((packed));
#define CEPH_MDS_FLAG_REPLAY 1 /* this is a replayed op */
#define CEPH_MDS_FLAG_WANT_DENTRY 2 /* want dentry in reply */
#define CEPH_MDS_FLAG_REPLAY 1 /* this is a replayed op */
#define CEPH_MDS_FLAG_WANT_DENTRY 2 /* want dentry in reply */
#define CEPH_MDS_FLAG_ASYNC 4 /* request is asynchronous */
struct ceph_mds_request_head {
__le64 oldest_client_tid;
......@@ -530,6 +531,9 @@ struct ceph_mds_reply_lease {
__le32 seq;
} __attribute__ ((packed));
#define CEPH_LEASE_VALID (1 | 2) /* old and new bit values */
#define CEPH_LEASE_PRIMARY_LINK 4 /* primary linkage */
struct ceph_mds_reply_dirfrag {
__le32 frag; /* fragment */
__le32 auth; /* auth mds, if this is a delegation point */
......@@ -564,6 +568,7 @@ struct ceph_filelock {
#define CEPH_FILE_MODE_RDWR 3 /* RD | WR */
#define CEPH_FILE_MODE_LAZY 4 /* lazy io */
#define CEPH_FILE_MODE_BITS 4
#define CEPH_FILE_MODE_MASK ((1 << CEPH_FILE_MODE_BITS) - 1)
int ceph_flags_to_mode(int flags);
......@@ -655,10 +660,19 @@ int ceph_flags_to_mode(int flags);
#define CEPH_CAP_ANY (CEPH_CAP_ANY_RD | CEPH_CAP_ANY_EXCL | \
CEPH_CAP_ANY_FILE_WR | CEPH_CAP_FILE_LAZYIO | \
CEPH_CAP_PIN)
#define CEPH_CAP_ALL_FILE (CEPH_CAP_PIN | CEPH_CAP_ANY_SHARED | \
CEPH_CAP_AUTH_EXCL | CEPH_CAP_XATTR_EXCL | \
CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)
#define CEPH_CAP_LOCKS (CEPH_LOCK_IFILE | CEPH_LOCK_IAUTH | CEPH_LOCK_ILINK | \
CEPH_LOCK_IXATTR)
/* cap masks async dir operations */
#define CEPH_CAP_DIR_CREATE CEPH_CAP_FILE_CACHE
#define CEPH_CAP_DIR_UNLINK CEPH_CAP_FILE_RD
#define CEPH_CAP_ANY_DIR_OPS (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_RD | \
CEPH_CAP_FILE_WREXTEND | CEPH_CAP_FILE_LAZYIO)
int ceph_caps_for_mode(int mode);
enum {
......
......@@ -2,22 +2,8 @@
#ifndef _FS_CEPH_DEBUGFS_H
#define _FS_CEPH_DEBUGFS_H
#include <linux/ceph/ceph_debug.h>
#include <linux/ceph/types.h>
#define CEPH_DEFINE_SHOW_FUNC(name) \
static int name##_open(struct inode *inode, struct file *file) \
{ \
return single_open(file, name, inode->i_private); \
} \
\
static const struct file_operations name##_fops = { \
.open = name##_open, \
.read = seq_read, \
.llseek = seq_lseek, \
.release = single_release, \
};
/* debugfs.c */
extern void ceph_debugfs_init(void);
extern void ceph_debugfs_cleanup(void);
......
......@@ -272,6 +272,7 @@ extern struct kmem_cache *ceph_cap_flush_cachep;
extern struct kmem_cache *ceph_dentry_cachep;
extern struct kmem_cache *ceph_file_cachep;
extern struct kmem_cache *ceph_dir_file_cachep;
extern struct kmem_cache *ceph_mds_request_cachep;
/* ceph_common.c */
extern bool libceph_compatible(void *data);
......
......@@ -509,23 +509,6 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
struct page *req_page, size_t req_len,
struct page **resp_pages, size_t *resp_len);
extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
struct ceph_vino vino,
struct ceph_file_layout *layout,
u64 off, u64 *plen,
u32 truncate_seq, u64 truncate_size,
struct page **pages, int nr_pages,
int page_align);
extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
struct ceph_vino vino,
struct ceph_file_layout *layout,
struct ceph_snap_context *sc,
u64 off, u64 len,
u32 truncate_seq, u64 truncate_size,
struct timespec64 *mtime,
struct page **pages, int nr_pages);
int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
u64 src_snapid, u64 src_version,
struct ceph_object_id *src_oid,
......
......@@ -383,11 +383,11 @@ static int client_options_show(struct seq_file *s, void *p)
return 0;
}
CEPH_DEFINE_SHOW_FUNC(monmap_show)
CEPH_DEFINE_SHOW_FUNC(osdmap_show)
CEPH_DEFINE_SHOW_FUNC(monc_show)
CEPH_DEFINE_SHOW_FUNC(osdc_show)
CEPH_DEFINE_SHOW_FUNC(client_options_show)
DEFINE_SHOW_ATTRIBUTE(monmap);
DEFINE_SHOW_ATTRIBUTE(osdmap);
DEFINE_SHOW_ATTRIBUTE(monc);
DEFINE_SHOW_ATTRIBUTE(osdc);
DEFINE_SHOW_ATTRIBUTE(client_options);
void __init ceph_debugfs_init(void)
{
......@@ -414,31 +414,31 @@ void ceph_debugfs_client_init(struct ceph_client *client)
0400,
client->debugfs_dir,
client,
&monc_show_fops);
&monc_fops);
client->osdc.debugfs_file = debugfs_create_file("osdc",
0400,
client->debugfs_dir,
client,
&osdc_show_fops);
&osdc_fops);
client->debugfs_monmap = debugfs_create_file("monmap",
0400,
client->debugfs_dir,
client,
&monmap_show_fops);
&monmap_fops);
client->debugfs_osdmap = debugfs_create_file("osdmap",
0400,
client->debugfs_dir,
client,
&osdmap_show_fops);
&osdmap_fops);
client->debugfs_options = debugfs_create_file("client_options",
0400,
client->debugfs_dir,
client,
&client_options_show_fops);
&client_options_fops);
}
void ceph_debugfs_client_cleanup(struct ceph_client *client)
......
......@@ -467,7 +467,7 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
struct ceph_msg *msg)
{
struct ceph_client *client = monc->client;
struct ceph_monmap *monmap = NULL, *old = monc->monmap;
struct ceph_monmap *monmap;
void *p, *end;
mutex_lock(&monc->mutex);
......@@ -484,13 +484,13 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
goto out;
}
if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
if (ceph_check_fsid(client, &monmap->fsid) < 0) {
kfree(monmap);
goto out;
}
client->monc.monmap = monmap;
kfree(old);
kfree(monc->monmap);
monc->monmap = monmap;
__ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
client->have_fsid = true;
......
......@@ -3483,9 +3483,6 @@ static int ceph_redirect_decode(void **p, void *end,
goto e_inval;
}
len = ceph_decode_32(p);
*p += len; /* skip osd_instructions */
/* skip the rest */
*p = struct_end;
out:
......@@ -5228,85 +5225,6 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
ceph_msgpool_destroy(&osdc->msgpool_op_reply);
}
/*
* Read some contiguous pages. If we cross a stripe boundary, shorten
* *plen. Return number of bytes read, or error.
*/
int ceph_osdc_readpages(struct ceph_osd_client *osdc,
struct ceph_vino vino, struct ceph_file_layout *layout,
u64 off, u64 *plen,
u32 truncate_seq, u64 truncate_size,
struct page **pages, int num_pages, int page_align)
{
struct ceph_osd_request *req;
int rc = 0;
dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
vino.snap, off, *plen);
req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
NULL, truncate_seq, truncate_size,
false);
if (IS_ERR(req))
return PTR_ERR(req);
/* it may be a short read due to an object boundary */
osd_req_op_extent_osd_data_pages(req, 0,
pages, *plen, page_align, false, false);
dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n",
off, *plen, *plen, page_align);
rc = ceph_osdc_start_request(osdc, req, false);
if (!rc)
rc = ceph_osdc_wait_request(osdc, req);
ceph_osdc_put_request(req);
dout("readpages result %d\n", rc);
return rc;
}
EXPORT_SYMBOL(ceph_osdc_readpages);
/*
* do a synchronous write on N pages
*/
int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
struct ceph_file_layout *layout,
struct ceph_snap_context *snapc,
u64 off, u64 len,
u32 truncate_seq, u64 truncate_size,
struct timespec64 *mtime,
struct page **pages, int num_pages)
{
struct ceph_osd_request *req;
int rc = 0;
int page_align = off & ~PAGE_MASK;
req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
snapc, truncate_seq, truncate_size,
true);
if (IS_ERR(req))
return PTR_ERR(req);
/* it may be a short write due to an object boundary */
osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
false, false);
dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
req->r_mtime = *mtime;
rc = ceph_osdc_start_request(osdc, req, true);
if (!rc)
rc = ceph_osdc_wait_request(osdc, req);
ceph_osdc_put_request(req);
if (rc == 0)
rc = len;
dout("writepages result %d\n", rc);
return rc;
}
EXPORT_SYMBOL(ceph_osdc_writepages);
static int osd_req_op_copy_from_init(struct ceph_osd_request *req,
u64 src_snapid, u64 src_version,
struct ceph_object_id *src_oid,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册