diff --git a/MAINTAINERS b/MAINTAINERS index e1c83998b85863fc81744806d0f4613ebd0d644b..5b4e73f0afd00c8db0baf4193031da9b6f5342d6 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -4456,6 +4456,7 @@ F: drivers/power/supply/cw2015_battery.c CEPH COMMON CODE (LIBCEPH) M: Ilya Dryomov M: Jeff Layton +M: Xiubo Li L: ceph-devel@vger.kernel.org S: Supported W: http://ceph.com/ @@ -4466,6 +4467,7 @@ F: net/ceph/ CEPH DISTRIBUTED FILE SYSTEM CLIENT (CEPH) M: Jeff Layton +M: Xiubo Li M: Ilya Dryomov L: ceph-devel@vger.kernel.org S: Supported diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index f6135c93ce9d58f0bd291844a1bd7d88e781f28b..c7a0ab0d298ba68917a9e95c953260d6f269f798 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -184,7 +184,7 @@ static int ceph_releasepage(struct page *page, gfp_t gfp) static void ceph_netfs_expand_readahead(struct netfs_read_request *rreq) { - struct inode *inode = rreq->mapping->host; + struct inode *inode = rreq->inode; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_file_layout *lo = &ci->i_layout; u32 blockoff; @@ -201,7 +201,7 @@ static void ceph_netfs_expand_readahead(struct netfs_read_request *rreq) static bool ceph_netfs_clamp_length(struct netfs_read_subrequest *subreq) { - struct inode *inode = subreq->rreq->mapping->host; + struct inode *inode = subreq->rreq->inode; struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_inode_info *ci = ceph_inode(inode); u64 objno, objoff; @@ -244,10 +244,63 @@ static void finish_netfs_read(struct ceph_osd_request *req) iput(req->r_inode); } +static bool ceph_netfs_issue_op_inline(struct netfs_read_subrequest *subreq) +{ + struct netfs_read_request *rreq = subreq->rreq; + struct inode *inode = rreq->inode; + struct ceph_mds_reply_info_parsed *rinfo; + struct ceph_mds_reply_info_in *iinfo; + struct ceph_mds_request *req; + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); + struct ceph_inode_info *ci = ceph_inode(inode); + struct iov_iter iter; + ssize_t err = 0; + size_t len; + + __set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags); + __clear_bit(NETFS_SREQ_WRITE_TO_CACHE, &subreq->flags); + + if (subreq->start >= inode->i_size) + goto out; + + /* We need to fetch the inline data. */ + req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS); + if (IS_ERR(req)) { + err = PTR_ERR(req); + goto out; + } + req->r_ino1 = ci->i_vino; + req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INLINE_DATA); + req->r_num_caps = 2; + + err = ceph_mdsc_do_request(mdsc, NULL, req); + if (err < 0) + goto out; + + rinfo = &req->r_reply_info; + iinfo = &rinfo->targeti; + if (iinfo->inline_version == CEPH_INLINE_NONE) { + /* The data got uninlined */ + ceph_mdsc_put_request(req); + return false; + } + + len = min_t(size_t, iinfo->inline_len - subreq->start, subreq->len); + iov_iter_xarray(&iter, READ, &rreq->mapping->i_pages, subreq->start, len); + err = copy_to_iter(iinfo->inline_data + subreq->start, len, &iter); + if (err == 0) + err = -EFAULT; + + ceph_mdsc_put_request(req); +out: + netfs_subreq_terminated(subreq, err, false); + return true; +} + static void ceph_netfs_issue_op(struct netfs_read_subrequest *subreq) { struct netfs_read_request *rreq = subreq->rreq; - struct inode *inode = rreq->mapping->host; + struct inode *inode = rreq->inode; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_osd_request *req; @@ -258,6 +311,10 @@ static void ceph_netfs_issue_op(struct netfs_read_subrequest *subreq) int err = 0; u64 len = subreq->len; + if (ci->i_inline_version != CEPH_INLINE_NONE && + ceph_netfs_issue_op_inline(subreq)) + return; + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, subreq->start, &len, 0, 1, CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ | fsc->client->osdc.client->options->read_from_replica, @@ -326,23 +383,9 @@ static int ceph_readpage(struct file *file, struct page *subpage) size_t len = folio_size(folio); u64 off = folio_file_pos(folio); - if (ci->i_inline_version != CEPH_INLINE_NONE) { - /* - * Uptodate inline data should have been added - * into page cache while getting Fcr caps. - */ - if (off == 0) { - folio_unlock(folio); - return -EINVAL; - } - zero_user_segment(&folio->page, 0, folio_size(folio)); - folio_mark_uptodate(folio); - folio_unlock(folio); - return 0; - } - - dout("readpage ino %llx.%llx file %p off %llu len %zu folio %p index %lu\n", - vino.ino, vino.snap, file, off, len, folio, folio_index(folio)); + dout("readpage ino %llx.%llx file %p off %llu len %zu folio %p index %lu\n inline %d", + vino.ino, vino.snap, file, off, len, folio, folio_index(folio), + ci->i_inline_version != CEPH_INLINE_NONE); return netfs_readpage(file, folio, &ceph_netfs_read_ops, NULL); } @@ -1281,45 +1324,11 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping, struct page **pagep, void **fsdata) { struct inode *inode = file_inode(file); - struct ceph_inode_info *ci = ceph_inode(inode); struct folio *folio = NULL; - pgoff_t index = pos >> PAGE_SHIFT; int r; - /* - * Uninlining should have already been done and everything updated, EXCEPT - * for inline_version sent to the MDS. - */ - if (ci->i_inline_version != CEPH_INLINE_NONE) { - unsigned int fgp_flags = FGP_LOCK | FGP_WRITE | FGP_CREAT | FGP_STABLE; - if (aop_flags & AOP_FLAG_NOFS) - fgp_flags |= FGP_NOFS; - folio = __filemap_get_folio(mapping, index, fgp_flags, - mapping_gfp_mask(mapping)); - if (!folio) - return -ENOMEM; - - /* - * The inline_version on a new inode is set to 1. If that's the - * case, then the folio is brand new and isn't yet Uptodate. - */ - r = 0; - if (index == 0 && ci->i_inline_version != 1) { - if (!folio_test_uptodate(folio)) { - WARN_ONCE(1, "ceph: write_begin called on still-inlined inode (inline_version %llu)!\n", - ci->i_inline_version); - r = -EINVAL; - } - goto out; - } - zero_user_segment(&folio->page, 0, folio_size(folio)); - folio_mark_uptodate(folio); - goto out; - } - r = netfs_write_begin(file, inode->i_mapping, pos, len, 0, &folio, NULL, &ceph_netfs_read_ops, NULL); -out: if (r == 0) folio_wait_fscache(folio); if (r < 0) { @@ -1515,19 +1524,6 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) sb_start_pagefault(inode->i_sb); ceph_block_sigs(&oldset); - if (ci->i_inline_version != CEPH_INLINE_NONE) { - struct page *locked_page = NULL; - if (off == 0) { - lock_page(page); - locked_page = page; - } - err = ceph_uninline_data(vma->vm_file, locked_page); - if (locked_page) - unlock_page(locked_page); - if (err < 0) - goto out_free; - } - if (off + thp_size(page) <= size) len = thp_size(page); else @@ -1584,11 +1580,9 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) ceph_put_snap_context(snapc); } while (err == 0); - if (ret == VM_FAULT_LOCKED || - ci->i_inline_version != CEPH_INLINE_NONE) { + if (ret == VM_FAULT_LOCKED) { int dirty; spin_lock(&ci->i_ceph_lock); - ci->i_inline_version = CEPH_INLINE_NONE; dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, &prealloc_cf); spin_unlock(&ci->i_ceph_lock); @@ -1652,16 +1646,30 @@ void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, } } -int ceph_uninline_data(struct file *filp, struct page *locked_page) +int ceph_uninline_data(struct file *file) { - struct inode *inode = file_inode(filp); + struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_osd_request *req; - struct page *page = NULL; - u64 len, inline_version; + struct ceph_cap_flush *prealloc_cf; + struct folio *folio = NULL; + u64 inline_version = CEPH_INLINE_NONE; + struct page *pages[1]; int err = 0; - bool from_pagecache = false; + u64 len; + + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return -ENOMEM; + + folio = read_mapping_folio(inode->i_mapping, 0, file); + if (IS_ERR(folio)) { + err = PTR_ERR(folio); + goto out; + } + + folio_lock(folio); spin_lock(&ci->i_ceph_lock); inline_version = ci->i_inline_version; @@ -1672,45 +1680,11 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) if (inline_version == 1 || /* initial version, no data */ inline_version == CEPH_INLINE_NONE) - goto out; - - if (locked_page) { - page = locked_page; - WARN_ON(!PageUptodate(page)); - } else if (ceph_caps_issued(ci) & - (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) { - page = find_get_page(inode->i_mapping, 0); - if (page) { - if (PageUptodate(page)) { - from_pagecache = true; - lock_page(page); - } else { - put_page(page); - page = NULL; - } - } - } + goto out_unlock; - if (page) { - len = i_size_read(inode); - if (len > PAGE_SIZE) - len = PAGE_SIZE; - } else { - page = __page_cache_alloc(GFP_NOFS); - if (!page) { - err = -ENOMEM; - goto out; - } - err = __ceph_do_getattr(inode, page, - CEPH_STAT_CAP_INLINE_DATA, true); - if (err < 0) { - /* no inline data */ - if (err == -ENODATA) - err = 0; - goto out; - } - len = err; - } + len = i_size_read(inode); + if (len > folio_size(folio)) + len = folio_size(folio); req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, ceph_vino(inode), 0, &len, 0, 1, @@ -1718,7 +1692,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) NULL, 0, 0, false); if (IS_ERR(req)) { err = PTR_ERR(req); - goto out; + goto out_unlock; } req->r_mtime = inode->i_mtime; @@ -1727,7 +1701,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) err = ceph_osdc_wait_request(&fsc->client->osdc, req); ceph_osdc_put_request(req); if (err < 0) - goto out; + goto out_unlock; req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, ceph_vino(inode), 0, &len, 1, 3, @@ -1736,10 +1710,11 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) ci->i_truncate_size, false); if (IS_ERR(req)) { err = PTR_ERR(req); - goto out; + goto out_unlock; } - osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false); + pages[0] = folio_page(folio, 0); + osd_req_op_extent_osd_data_pages(req, 1, pages, len, 0, false, false); { __le64 xattr_buf = cpu_to_le64(inline_version); @@ -1749,7 +1724,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) CEPH_OSD_CMPXATTR_OP_GT, CEPH_OSD_CMPXATTR_MODE_U64); if (err) - goto out_put; + goto out_put_req; } { @@ -1760,7 +1735,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) "inline_version", xattr_buf, xattr_len, 0, 0); if (err) - goto out_put; + goto out_put_req; } req->r_mtime = inode->i_mtime; @@ -1771,19 +1746,28 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, req->r_end_latency, len, err); -out_put: + if (!err) { + int dirty; + + /* Set to CAP_INLINE_NONE and dirty the caps */ + down_read(&fsc->mdsc->snap_rwsem); + spin_lock(&ci->i_ceph_lock); + ci->i_inline_version = CEPH_INLINE_NONE; + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, &prealloc_cf); + spin_unlock(&ci->i_ceph_lock); + up_read(&fsc->mdsc->snap_rwsem); + if (dirty) + __mark_inode_dirty(inode, dirty); + } +out_put_req: ceph_osdc_put_request(req); if (err == -ECANCELED) err = 0; +out_unlock: + folio_unlock(folio); + folio_put(folio); out: - if (page && page != locked_page) { - if (from_pagecache) { - unlock_page(page); - put_page(page); - } else - __free_pages(page, 0); - } - + ceph_free_cap_flush(prealloc_cf); dout("uninline_data %p %llx.%llx inline_version %llu = %d\n", inode, ceph_vinop(inode), inline_version, err); return err; diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index b472cd066d1c8f86ea3e13a6538b773cd9bb946f..f1ad6884d4dafdfb7403b26646274211e34f91eb 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1915,6 +1915,13 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ceph_get_mds_session(session); spin_lock(&ci->i_ceph_lock); + if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) { + /* Don't send messages until we get async create reply */ + spin_unlock(&ci->i_ceph_lock); + ceph_put_mds_session(session); + return; + } + if (ci->i_ceph_flags & CEPH_I_FLUSH) flags |= CHECK_CAPS_FLUSH; retry: @@ -2409,6 +2416,9 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) dout("write_inode %p wait=%d\n", inode, wait); ceph_fscache_unpin_writeback(inode, wbc); if (wait) { + err = ceph_wait_on_async_create(inode); + if (err) + return err; dirty = try_flush_caps(inode, &flush_tid); if (dirty) err = wait_event_interruptible(ci->i_cap_wq, @@ -2439,6 +2449,10 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc, u64 first_tid = 0; u64 last_snap_flush = 0; + /* Don't do anything until create reply comes in */ + if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) + return; + ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; list_for_each_entry_reverse(cf, &ci->i_cap_flush_list, i_list) { @@ -4152,7 +4166,6 @@ void ceph_handle_caps(struct ceph_mds_session *session, /* lookup ino */ inode = ceph_find_inode(mdsc->fsc->sb, vino); - ci = ceph_inode(inode); dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino, vino.snap, inode); @@ -4178,6 +4191,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, } goto flush_cap_releases; } + ci = ceph_inode(inode); /* these will work even if we don't have a cap yet */ switch (op) { diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c index 3cf7c9c1085b9121865885847eaec42478c6b465..bec3c4549c07dadfef8c416a2aa380e4b16275bc 100644 --- a/fs/ceph/debugfs.c +++ b/fs/ceph/debugfs.c @@ -175,7 +175,7 @@ static int metrics_latency_show(struct seq_file *s, void *p) struct ceph_fs_client *fsc = s->private; struct ceph_client_metric *cm = &fsc->mdsc->metric; struct ceph_metric *m; - s64 total, sum, avg, min, max, sq; + s64 total, avg, min, max, sq; int i; seq_printf(s, "item total avg_lat(us) min_lat(us) max_lat(us) stdev(us)\n"); @@ -185,8 +185,7 @@ static int metrics_latency_show(struct seq_file *s, void *p) m = &cm->metric[i]; spin_lock(&m->lock); total = m->total; - sum = m->latency_sum; - avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0; + avg = m->latency_avg; min = m->latency_min; max = m->latency_max; sq = m->latency_sq_sum; diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 133dbd9338e7304e40cda2a86ffca10a9905d3a9..eae417d71136411a675497cdada19ff64d43b1a0 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -145,7 +145,7 @@ __dcache_find_get_entry(struct dentry *parent, u64 idx, return ERR_PTR(-EAGAIN); } /* reading/filling the cache are serialized by - i_mutex, no need to use page lock */ + i_rwsem, no need to use page lock */ unlock_page(cache_ctl->page); cache_ctl->dentries = kmap(cache_ctl->page); } @@ -155,7 +155,7 @@ __dcache_find_get_entry(struct dentry *parent, u64 idx, rcu_read_lock(); spin_lock(&parent->d_lock); /* check i_size again here, because empty directory can be - * marked as complete while not holding the i_mutex. */ + * marked as complete while not holding the i_rwsem. */ if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir)) dentry = cache_ctl->dentries[cache_ctl->index]; else @@ -478,8 +478,11 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) 2 : (fpos_off(rde->offset) + 1); err = note_last_dentry(dfi, rde->name, rde->name_len, next_offset); - if (err) + if (err) { + ceph_mdsc_put_request(dfi->last_readdir); + dfi->last_readdir = NULL; return err; + } } else if (req->r_reply_info.dir_end) { dfi->next_offset = 2; /* keep last name */ @@ -520,6 +523,12 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) if (!dir_emit(ctx, rde->name, rde->name_len, ceph_present_ino(inode->i_sb, le64_to_cpu(rde->inode.in->ino)), le32_to_cpu(rde->inode.in->mode) >> 12)) { + /* + * NOTE: Here no need to put the 'dfi->last_readdir', + * because when dir_emit stops us it's most likely + * doesn't have enough memory, etc. So for next readdir + * it will continue. + */ dout("filldir stopping us...\n"); return 0; } @@ -671,7 +680,7 @@ struct dentry *ceph_handle_snapdir(struct ceph_mds_request *req, struct dentry *dentry) { struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); - struct inode *parent = d_inode(dentry->d_parent); /* we hold i_mutex */ + struct inode *parent = d_inode(dentry->d_parent); /* we hold i_rwsem */ /* .snap dir? */ if (ceph_snap(parent) == CEPH_NOSNAP && diff --git a/fs/ceph/file.c b/fs/ceph/file.c index bbed3224ad689c15fa3b288c3536eb64cc4f9935..feb75eb1cd82651f17e67892f359ad307fd08d16 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -207,6 +207,7 @@ static int ceph_init_file_info(struct inode *inode, struct file *file, struct ceph_mount_options *opt = ceph_inode_to_client(&ci->vfs_inode)->mount_options; struct ceph_file_info *fi; + int ret; dout("%s %p %p 0%o (%s)\n", __func__, inode, file, inode->i_mode, isdir ? "dir" : "regular"); @@ -240,7 +241,22 @@ static int ceph_init_file_info(struct inode *inode, struct file *file, INIT_LIST_HEAD(&fi->rw_contexts); fi->filp_gen = READ_ONCE(ceph_inode_to_client(inode)->filp_gen); + if ((file->f_mode & FMODE_WRITE) && + ci->i_inline_version != CEPH_INLINE_NONE) { + ret = ceph_uninline_data(file); + if (ret < 0) + goto error; + } + return 0; + +error: + ceph_fscache_unuse_cookie(inode, file->f_mode & FMODE_WRITE); + ceph_put_fmode(ci, fi->fmode, 1); + kmem_cache_free(ceph_file_cachep, fi); + /* wake up anyone waiting for caps on this inode */ + wake_up_all(&ci->i_cap_wq); + return ret; } /* @@ -516,52 +532,67 @@ static void restore_deleg_ino(struct inode *dir, u64 ino) } } +static void wake_async_create_waiters(struct inode *inode, + struct ceph_mds_session *session) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + + spin_lock(&ci->i_ceph_lock); + if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) { + ci->i_ceph_flags &= ~CEPH_I_ASYNC_CREATE; + wake_up_bit(&ci->i_ceph_flags, CEPH_ASYNC_CREATE_BIT); + } + ceph_kick_flushing_inode_caps(session, ci); + spin_unlock(&ci->i_ceph_lock); +} + static void ceph_async_create_cb(struct ceph_mds_client *mdsc, struct ceph_mds_request *req) { + struct dentry *dentry = req->r_dentry; + struct inode *dinode = d_inode(dentry); + struct inode *tinode = req->r_target_inode; int result = req->r_err ? req->r_err : le32_to_cpu(req->r_reply_info.head->result); + WARN_ON_ONCE(dinode && tinode && dinode != tinode); + + /* MDS changed -- caller must resubmit */ if (result == -EJUKEBOX) goto out; mapping_set_error(req->r_parent->i_mapping, result); if (result) { - struct dentry *dentry = req->r_dentry; - struct inode *inode = d_inode(dentry); int pathlen = 0; u64 base = 0; char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen, &base, 0); + pr_warn("ceph: async create failure path=(%llx)%s result=%d!\n", + base, IS_ERR(path) ? "<>" : path, result); + ceph_mdsc_free_path(path, pathlen); + ceph_dir_clear_complete(req->r_parent); if (!d_unhashed(dentry)) d_drop(dentry); - ceph_inode_shutdown(inode); - - pr_warn("ceph: async create failure path=(%llx)%s result=%d!\n", - base, IS_ERR(path) ? "<>" : path, result); - ceph_mdsc_free_path(path, pathlen); + if (dinode) { + mapping_set_error(dinode->i_mapping, result); + ceph_inode_shutdown(dinode); + wake_async_create_waiters(dinode, req->r_session); + } } - if (req->r_target_inode) { - struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); - u64 ino = ceph_vino(req->r_target_inode).ino; + if (tinode) { + u64 ino = ceph_vino(tinode).ino; if (req->r_deleg_ino != ino) pr_warn("%s: inode number mismatch! err=%d deleg_ino=0x%llx target=0x%llx\n", __func__, req->r_err, req->r_deleg_ino, ino); - mapping_set_error(req->r_target_inode->i_mapping, result); - spin_lock(&ci->i_ceph_lock); - if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) { - ci->i_ceph_flags &= ~CEPH_I_ASYNC_CREATE; - wake_up_bit(&ci->i_ceph_flags, CEPH_ASYNC_CREATE_BIT); - } - ceph_kick_flushing_inode_caps(req->r_session, ci); - spin_unlock(&ci->i_ceph_lock); + mapping_set_error(tinode->i_mapping, result); + wake_async_create_waiters(tinode, req->r_session); } else if (!result) { pr_warn("%s: no req->r_target_inode for 0x%llx\n", __func__, req->r_deleg_ino); @@ -1041,7 +1072,6 @@ static void ceph_aio_complete(struct inode *inode, } spin_lock(&ci->i_ceph_lock); - ci->i_inline_version = CEPH_INLINE_NONE; dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, &aio_req->prealloc_cf); spin_unlock(&ci->i_ceph_lock); @@ -1778,12 +1808,6 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) if (err) goto out; - if (ci->i_inline_version != CEPH_INLINE_NONE) { - err = ceph_uninline_data(file, NULL); - if (err < 0) - goto out; - } - dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n", inode, ceph_vinop(inode), pos, count, i_size_read(inode)); if (!(fi->flags & CEPH_F_SYNC) && !direct_lock) @@ -1855,7 +1879,6 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) int dirty; spin_lock(&ci->i_ceph_lock); - ci->i_inline_version = CEPH_INLINE_NONE; dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, &prealloc_cf); spin_unlock(&ci->i_ceph_lock); @@ -2109,12 +2132,6 @@ static long ceph_fallocate(struct file *file, int mode, goto unlock; } - if (ci->i_inline_version != CEPH_INLINE_NONE) { - ret = ceph_uninline_data(file, NULL); - if (ret < 0) - goto unlock; - } - size = i_size_read(inode); /* Are we punching a hole beyond EOF? */ @@ -2139,7 +2156,6 @@ static long ceph_fallocate(struct file *file, int mode, if (!ret) { spin_lock(&ci->i_ceph_lock); - ci->i_inline_version = CEPH_INLINE_NONE; dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, &prealloc_cf); spin_unlock(&ci->i_ceph_lock); @@ -2532,7 +2548,6 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off, } /* Mark Fw dirty */ spin_lock(&dst_ci->i_ceph_lock); - dst_ci->i_inline_version = CEPH_INLINE_NONE; dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf); spin_unlock(&dst_ci->i_ceph_lock); if (dirty) diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 9cfa6c730519423ca1a7d545d106d207667c6e84..d80911dc91c2010ddd7da20467ffc1490132957f 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -87,13 +87,13 @@ struct inode *ceph_get_snapdir(struct inode *parent) if (!S_ISDIR(parent->i_mode)) { pr_warn_once("bad snapdir parent type (mode=0%o)\n", parent->i_mode); - return ERR_PTR(-ENOTDIR); + goto err; } if (!(inode->i_state & I_NEW) && !S_ISDIR(inode->i_mode)) { pr_warn_once("bad snapdir inode type (mode=0%o)\n", inode->i_mode); - return ERR_PTR(-ENOTDIR); + goto err; } inode->i_mode = parent->i_mode; @@ -113,6 +113,12 @@ struct inode *ceph_get_snapdir(struct inode *parent) } return inode; +err: + if ((inode->i_state & I_NEW)) + discard_new_inode(inode); + else + iput(inode); + return ERR_PTR(-ENOTDIR); } const struct inode_operations ceph_file_iops = { @@ -1201,7 +1207,7 @@ static void update_dentry_lease_careful(struct dentry *dentry, /* * splice a dentry to an inode. - * caller must hold directory i_mutex for this to be safe. + * caller must hold directory i_rwsem for this to be safe. */ static int splice_dentry(struct dentry **pdn, struct inode *in) { @@ -1598,7 +1604,7 @@ static int fill_readdir_cache(struct inode *dir, struct dentry *dn, return idx == 0 ? -ENOMEM : 0; } /* reading/filling the cache are serialized by - * i_mutex, no need to use page lock */ + * i_rwsem, no need to use page lock */ unlock_page(ctl->page); ctl->dentries = kmap(ctl->page); if (idx == 0) @@ -2301,6 +2307,57 @@ int __ceph_do_getattr(struct inode *inode, struct page *locked_page, return err; } +int ceph_do_getvxattr(struct inode *inode, const char *name, void *value, + size_t size) +{ + struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb); + struct ceph_mds_client *mdsc = fsc->mdsc; + struct ceph_mds_request *req; + int mode = USE_AUTH_MDS; + int err; + char *xattr_value; + size_t xattr_value_len; + + req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETVXATTR, mode); + if (IS_ERR(req)) { + err = -ENOMEM; + goto out; + } + + req->r_path2 = kstrdup(name, GFP_NOFS); + if (!req->r_path2) { + err = -ENOMEM; + goto put; + } + + ihold(inode); + req->r_inode = inode; + err = ceph_mdsc_do_request(mdsc, NULL, req); + if (err < 0) + goto put; + + xattr_value = req->r_reply_info.xattr_info.xattr_value; + xattr_value_len = req->r_reply_info.xattr_info.xattr_value_len; + + dout("do_getvxattr xattr_value_len:%zu, size:%zu\n", xattr_value_len, size); + + err = (int)xattr_value_len; + if (size == 0) + goto put; + + if (xattr_value_len > size) { + err = -ERANGE; + goto put; + } + + memcpy(value, xattr_value, xattr_value_len); +put: + ceph_mdsc_put_request(req); +out: + dout("do_getvxattr result=%d\n", err); + return err; +} + /* * Check inode permissions. We verify we have a valid value for diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c index d1f154aec249bf3d38fb8d10dd66c311065fdb68..3e2843e86e2744486075519dfd87f9d39edc9ced 100644 --- a/fs/ceph/locks.c +++ b/fs/ceph/locks.c @@ -111,10 +111,10 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct inode *inode, req->r_args.filelock_change.length = cpu_to_le64(length); req->r_args.filelock_change.wait = wait; - if (wait) - req->r_wait_for_completion = ceph_lock_wait_for_completion; - - err = ceph_mdsc_do_request(mdsc, inode, req); + err = ceph_mdsc_submit_request(mdsc, inode, req); + if (!err) + err = ceph_mdsc_wait_request(mdsc, req, wait ? + ceph_lock_wait_for_completion : NULL); if (!err && operation == CEPH_MDS_OP_GETFILELOCK) { fl->fl_pid = -le64_to_cpu(req->r_reply_info.filelock_reply->pid); if (CEPH_LOCK_SHARED == req->r_reply_info.filelock_reply->type) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index c30eefc0ac19346e086128e3872a8d1848ea1dc2..fa38c013126d4fcaa89c8284175284b66f3b7fcc 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -555,6 +555,28 @@ static int parse_reply_info_create(void **p, void *end, return -EIO; } +static int parse_reply_info_getvxattr(void **p, void *end, + struct ceph_mds_reply_info_parsed *info, + u64 features) +{ + u32 value_len; + + ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */ + ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */ + ceph_decode_skip_32(p, end, bad); /* skip payload length */ + + ceph_decode_32_safe(p, end, value_len, bad); + + if (value_len == end - *p) { + info->xattr_info.xattr_value = *p; + info->xattr_info.xattr_value_len = value_len; + *p = end; + return value_len; + } +bad: + return -EIO; +} + /* * parse extra results */ @@ -570,6 +592,8 @@ static int parse_reply_info_extra(void **p, void *end, return parse_reply_info_readdir(p, end, info, features); else if (op == CEPH_MDS_OP_CREATE) return parse_reply_info_create(p, end, info, features, s); + else if (op == CEPH_MDS_OP_GETVXATTR) + return parse_reply_info_getvxattr(p, end, info, features); else return -EIO; } @@ -2178,7 +2202,8 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, order = get_order(size * num_entries); while (order >= 0) { rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | - __GFP_NOWARN, + __GFP_NOWARN | + __GFP_ZERO, order); if (rinfo->dir_entries) break; @@ -2946,15 +2971,16 @@ int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, return err; } -static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, - struct ceph_mds_request *req) +int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, + struct ceph_mds_request *req, + ceph_mds_request_wait_callback_t wait_func) { int err; /* wait */ dout("do_request waiting\n"); - if (!req->r_timeout && req->r_wait_for_completion) { - err = req->r_wait_for_completion(mdsc, req); + if (wait_func) { + err = wait_func(mdsc, req); } else { long timeleft = wait_for_completion_killable_timeout( &req->r_completion, @@ -3011,7 +3037,7 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, /* issue */ err = ceph_mdsc_submit_request(mdsc, dir, req); if (!err) - err = ceph_mdsc_wait_request(mdsc, req); + err = ceph_mdsc_wait_request(mdsc, req, NULL); dout("do_request %p done, result %d\n", req, err); return err; } @@ -3097,35 +3123,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) result = le32_to_cpu(head->result); - /* - * Handle an ESTALE - * if we're not talking to the authority, send to them - * if the authority has changed while we weren't looking, - * send to new authority - * Otherwise we just have to return an ESTALE - */ - if (result == -ESTALE) { - dout("got ESTALE on request %llu\n", req->r_tid); - req->r_resend_mds = -1; - if (req->r_direct_mode != USE_AUTH_MDS) { - dout("not using auth, setting for that now\n"); - req->r_direct_mode = USE_AUTH_MDS; - __do_request(mdsc, req); - mutex_unlock(&mdsc->mutex); - goto out; - } else { - int mds = __choose_mds(mdsc, req, NULL); - if (mds >= 0 && mds != req->r_session->s_mds) { - dout("but auth changed, so resending\n"); - __do_request(mdsc, req); - mutex_unlock(&mdsc->mutex); - goto out; - } - } - dout("have to return ESTALE on request %llu\n", req->r_tid); - } - - if (head->safe) { set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); __unregister_request(mdsc, req); @@ -4841,7 +4838,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) mutex_unlock(&mdsc->mutex); ceph_cleanup_snapid_map(mdsc); - ceph_cleanup_empty_realms(mdsc); + ceph_cleanup_global_and_empty_realms(mdsc); cancel_work_sync(&mdsc->cap_reclaim_work); cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 97c7f7bfa55f39ba67ae1f733791148142406241..33497846e47e81639976856e2a3896c85931f0d4 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -100,6 +100,11 @@ struct ceph_mds_reply_dir_entry { loff_t offset; }; +struct ceph_mds_reply_xattr { + char *xattr_value; + size_t xattr_value_len; +}; + /* * parsed info about an mds reply, including information about * either: 1) the target inode and/or its parent directory and dentry, @@ -115,6 +120,7 @@ struct ceph_mds_reply_info_parsed { char *dname; u32 dname_len; struct ceph_mds_reply_lease *dlease; + struct ceph_mds_reply_xattr xattr_info; /* extra */ union { @@ -274,8 +280,8 @@ struct ceph_mds_request { union ceph_mds_request_args r_args; int r_fmode; /* file mode, if expecting cap */ - const struct cred *r_cred; int r_request_release_offset; + const struct cred *r_cred; struct timespec64 r_stamp; /* for choosing which mds to send this request to */ @@ -296,12 +302,11 @@ struct ceph_mds_request { struct ceph_msg *r_reply; struct ceph_mds_reply_info_parsed r_reply_info; int r_err; - + u32 r_readdir_offset; struct page *r_locked_page; int r_dir_caps; int r_num_caps; - u32 r_readdir_offset; unsigned long r_timeout; /* optional. jiffies, 0 is "wait forever" */ unsigned long r_started; /* start time to measure timeout against */ @@ -329,7 +334,6 @@ struct ceph_mds_request { struct completion r_completion; struct completion r_safe_completion; ceph_mds_request_callback_t r_callback; - ceph_mds_request_wait_callback_t r_wait_for_completion; struct list_head r_unsafe_item; /* per-session unsafe list item */ long long r_dir_release_cnt; @@ -507,6 +511,9 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode); extern int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, struct ceph_mds_request *req); +int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, + struct ceph_mds_request *req, + ceph_mds_request_wait_callback_t wait_func); extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, struct inode *dir, struct ceph_mds_request *req); diff --git a/fs/ceph/metric.c b/fs/ceph/metric.c index 0fcba68f9a9903164a6a501d6a768c2183870429..c47347d2e84e3227d1f2f5383c14a2e0126bc648 100644 --- a/fs/ceph/metric.c +++ b/fs/ceph/metric.c @@ -8,6 +8,12 @@ #include "metric.h" #include "mds_client.h" +static void ktime_to_ceph_timespec(struct ceph_timespec *ts, ktime_t val) +{ + struct timespec64 t = ktime_to_timespec64(val); + ceph_encode_timespec64(ts, &t); +} + static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, struct ceph_mds_session *s) { @@ -26,7 +32,6 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, u64 nr_caps = atomic64_read(&m->total_caps); u32 header_len = sizeof(struct ceph_metric_header); struct ceph_msg *msg; - struct timespec64 ts; s64 sum; s32 items = 0; s32 len; @@ -59,37 +64,40 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, /* encode the read latency metric */ read = (struct ceph_metric_read_latency *)(cap + 1); read->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_READ_LATENCY); - read->header.ver = 1; + read->header.ver = 2; read->header.compat = 1; read->header.data_len = cpu_to_le32(sizeof(*read) - header_len); sum = m->metric[METRIC_READ].latency_sum; - jiffies_to_timespec64(sum, &ts); - read->sec = cpu_to_le32(ts.tv_sec); - read->nsec = cpu_to_le32(ts.tv_nsec); + ktime_to_ceph_timespec(&read->lat, sum); + ktime_to_ceph_timespec(&read->avg, m->metric[METRIC_READ].latency_avg); + read->sq_sum = cpu_to_le64(m->metric[METRIC_READ].latency_sq_sum); + read->count = cpu_to_le64(m->metric[METRIC_READ].total); items++; /* encode the write latency metric */ write = (struct ceph_metric_write_latency *)(read + 1); write->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_WRITE_LATENCY); - write->header.ver = 1; + write->header.ver = 2; write->header.compat = 1; write->header.data_len = cpu_to_le32(sizeof(*write) - header_len); sum = m->metric[METRIC_WRITE].latency_sum; - jiffies_to_timespec64(sum, &ts); - write->sec = cpu_to_le32(ts.tv_sec); - write->nsec = cpu_to_le32(ts.tv_nsec); + ktime_to_ceph_timespec(&write->lat, sum); + ktime_to_ceph_timespec(&write->avg, m->metric[METRIC_WRITE].latency_avg); + write->sq_sum = cpu_to_le64(m->metric[METRIC_WRITE].latency_sq_sum); + write->count = cpu_to_le64(m->metric[METRIC_WRITE].total); items++; /* encode the metadata latency metric */ meta = (struct ceph_metric_metadata_latency *)(write + 1); meta->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_METADATA_LATENCY); - meta->header.ver = 1; + meta->header.ver = 2; meta->header.compat = 1; meta->header.data_len = cpu_to_le32(sizeof(*meta) - header_len); sum = m->metric[METRIC_METADATA].latency_sum; - jiffies_to_timespec64(sum, &ts); - meta->sec = cpu_to_le32(ts.tv_sec); - meta->nsec = cpu_to_le32(ts.tv_nsec); + ktime_to_ceph_timespec(&meta->lat, sum); + ktime_to_ceph_timespec(&meta->avg, m->metric[METRIC_METADATA].latency_avg); + meta->sq_sum = cpu_to_le64(m->metric[METRIC_METADATA].latency_sq_sum); + meta->count = cpu_to_le64(m->metric[METRIC_METADATA].total); items++; /* encode the dentry lease metric */ @@ -250,6 +258,7 @@ int ceph_metric_init(struct ceph_client_metric *m) metric->size_max = 0; metric->total = 0; metric->latency_sum = 0; + metric->latency_avg = 0; metric->latency_sq_sum = 0; metric->latency_min = KTIME_MAX; metric->latency_max = 0; @@ -307,20 +316,19 @@ void ceph_metric_destroy(struct ceph_client_metric *m) max = new; \ } -static inline void __update_stdev(ktime_t total, ktime_t lsum, - ktime_t *sq_sump, ktime_t lat) +static inline void __update_mean_and_stdev(ktime_t total, ktime_t *lavg, + ktime_t *sq_sump, ktime_t lat) { - ktime_t avg, sq; - - if (unlikely(total == 1)) - return; - - /* the sq is (lat - old_avg) * (lat - new_avg) */ - avg = DIV64_U64_ROUND_CLOSEST((lsum - lat), (total - 1)); - sq = lat - avg; - avg = DIV64_U64_ROUND_CLOSEST(lsum, total); - sq = sq * (lat - avg); - *sq_sump += sq; + ktime_t avg; + + if (unlikely(total == 1)) { + *lavg = lat; + } else { + /* the sq is (lat - old_avg) * (lat - new_avg) */ + avg = *lavg + div64_s64(lat - *lavg, total); + *sq_sump += (lat - *lavg)*(lat - avg); + *lavg = avg; + } } void ceph_update_metrics(struct ceph_metric *m, @@ -339,6 +347,7 @@ void ceph_update_metrics(struct ceph_metric *m, METRIC_UPDATE_MIN_MAX(m->size_min, m->size_max, size); m->latency_sum += lat; METRIC_UPDATE_MIN_MAX(m->latency_min, m->latency_max, lat); - __update_stdev(total, m->latency_sum, &m->latency_sq_sum, lat); + __update_mean_and_stdev(total, &m->latency_avg, &m->latency_sq_sum, + lat); spin_unlock(&m->lock); } diff --git a/fs/ceph/metric.h b/fs/ceph/metric.h index bb45608181e7f2678367a6bf5167ba3f0b3715e8..0d0c44bd3332b920517f8338a158d0ae3068935d 100644 --- a/fs/ceph/metric.h +++ b/fs/ceph/metric.h @@ -2,7 +2,7 @@ #ifndef _FS_CEPH_MDS_METRIC_H #define _FS_CEPH_MDS_METRIC_H -#include +#include #include #include @@ -19,27 +19,39 @@ enum ceph_metric_type { CLIENT_METRIC_TYPE_OPENED_INODES, CLIENT_METRIC_TYPE_READ_IO_SIZES, CLIENT_METRIC_TYPE_WRITE_IO_SIZES, - - CLIENT_METRIC_TYPE_MAX = CLIENT_METRIC_TYPE_WRITE_IO_SIZES, + CLIENT_METRIC_TYPE_AVG_READ_LATENCY, + CLIENT_METRIC_TYPE_STDEV_READ_LATENCY, + CLIENT_METRIC_TYPE_AVG_WRITE_LATENCY, + CLIENT_METRIC_TYPE_STDEV_WRITE_LATENCY, + CLIENT_METRIC_TYPE_AVG_METADATA_LATENCY, + CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY, + + CLIENT_METRIC_TYPE_MAX = CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY, }; /* * This will always have the highest metric bit value * as the last element of the array. */ -#define CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED { \ - CLIENT_METRIC_TYPE_CAP_INFO, \ - CLIENT_METRIC_TYPE_READ_LATENCY, \ - CLIENT_METRIC_TYPE_WRITE_LATENCY, \ - CLIENT_METRIC_TYPE_METADATA_LATENCY, \ - CLIENT_METRIC_TYPE_DENTRY_LEASE, \ - CLIENT_METRIC_TYPE_OPENED_FILES, \ - CLIENT_METRIC_TYPE_PINNED_ICAPS, \ - CLIENT_METRIC_TYPE_OPENED_INODES, \ - CLIENT_METRIC_TYPE_READ_IO_SIZES, \ - CLIENT_METRIC_TYPE_WRITE_IO_SIZES, \ - \ - CLIENT_METRIC_TYPE_MAX, \ +#define CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED { \ + CLIENT_METRIC_TYPE_CAP_INFO, \ + CLIENT_METRIC_TYPE_READ_LATENCY, \ + CLIENT_METRIC_TYPE_WRITE_LATENCY, \ + CLIENT_METRIC_TYPE_METADATA_LATENCY, \ + CLIENT_METRIC_TYPE_DENTRY_LEASE, \ + CLIENT_METRIC_TYPE_OPENED_FILES, \ + CLIENT_METRIC_TYPE_PINNED_ICAPS, \ + CLIENT_METRIC_TYPE_OPENED_INODES, \ + CLIENT_METRIC_TYPE_READ_IO_SIZES, \ + CLIENT_METRIC_TYPE_WRITE_IO_SIZES, \ + CLIENT_METRIC_TYPE_AVG_READ_LATENCY, \ + CLIENT_METRIC_TYPE_STDEV_READ_LATENCY, \ + CLIENT_METRIC_TYPE_AVG_WRITE_LATENCY, \ + CLIENT_METRIC_TYPE_STDEV_WRITE_LATENCY, \ + CLIENT_METRIC_TYPE_AVG_METADATA_LATENCY, \ + CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY, \ + \ + CLIENT_METRIC_TYPE_MAX, \ } struct ceph_metric_header { @@ -60,22 +72,28 @@ struct ceph_metric_cap { /* metric read latency header */ struct ceph_metric_read_latency { struct ceph_metric_header header; - __le32 sec; - __le32 nsec; + struct ceph_timespec lat; + struct ceph_timespec avg; + __le64 sq_sum; + __le64 count; } __packed; /* metric write latency header */ struct ceph_metric_write_latency { struct ceph_metric_header header; - __le32 sec; - __le32 nsec; + struct ceph_timespec lat; + struct ceph_timespec avg; + __le64 sq_sum; + __le64 count; } __packed; /* metric metadata latency header */ struct ceph_metric_metadata_latency { struct ceph_metric_header header; - __le32 sec; - __le32 nsec; + struct ceph_timespec lat; + struct ceph_timespec avg; + __le64 sq_sum; + __le64 count; } __packed; /* metric dentry lease header */ @@ -140,6 +158,7 @@ struct ceph_metric { u64 size_min; u64 size_max; ktime_t latency_sum; + ktime_t latency_avg; ktime_t latency_sq_sum; ktime_t latency_min; ktime_t latency_max; diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index b41e6724c5910418a35523eebf1c489288fd35a4..322ee5add94261838fd95e2c06caf7fb72ab23c4 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -121,18 +121,23 @@ static struct ceph_snap_realm *ceph_create_snap_realm( if (!realm) return ERR_PTR(-ENOMEM); - atomic_set(&realm->nref, 1); /* for caller */ + /* Do not release the global dummy snaprealm until unmouting */ + if (ino == CEPH_INO_GLOBAL_SNAPREALM) + atomic_set(&realm->nref, 2); + else + atomic_set(&realm->nref, 1); realm->ino = ino; INIT_LIST_HEAD(&realm->children); INIT_LIST_HEAD(&realm->child_item); INIT_LIST_HEAD(&realm->empty_item); INIT_LIST_HEAD(&realm->dirty_item); + INIT_LIST_HEAD(&realm->rebuild_item); INIT_LIST_HEAD(&realm->inodes_with_caps); spin_lock_init(&realm->inodes_with_caps_lock); __insert_snap_realm(&mdsc->snap_realms, realm); mdsc->num_snap_realms++; - dout("create_snap_realm %llx %p\n", realm->ino, realm); + dout("%s %llx %p\n", __func__, realm->ino, realm); return realm; } @@ -156,7 +161,7 @@ static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc, else if (ino > r->ino) n = n->rb_right; else { - dout("lookup_snap_realm %llx %p\n", r->ino, r); + dout("%s %llx %p\n", __func__, r->ino, r); return r; } } @@ -184,7 +189,7 @@ static void __destroy_snap_realm(struct ceph_mds_client *mdsc, { lockdep_assert_held_write(&mdsc->snap_rwsem); - dout("__destroy_snap_realm %p %llx\n", realm, realm->ino); + dout("%s %p %llx\n", __func__, realm, realm->ino); rb_erase(&realm->node, &mdsc->snap_realms); mdsc->num_snap_realms--; @@ -260,9 +265,14 @@ static void __cleanup_empty_realms(struct ceph_mds_client *mdsc) spin_unlock(&mdsc->snap_empty_lock); } -void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc) +void ceph_cleanup_global_and_empty_realms(struct ceph_mds_client *mdsc) { + struct ceph_snap_realm *global_realm; + down_write(&mdsc->snap_rwsem); + global_realm = __lookup_snap_realm(mdsc, CEPH_INO_GLOBAL_SNAPREALM); + if (global_realm) + ceph_put_snap_realm(mdsc, global_realm); __cleanup_empty_realms(mdsc); up_write(&mdsc->snap_rwsem); } @@ -292,9 +302,8 @@ static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc, if (IS_ERR(parent)) return PTR_ERR(parent); } - dout("adjust_snap_realm_parent %llx %p: %llx %p -> %llx %p\n", - realm->ino, realm, realm->parent_ino, realm->parent, - parentino, parent); + dout("%s %llx %p: %llx %p -> %llx %p\n", __func__, realm->ino, + realm, realm->parent_ino, realm->parent, parentino, parent); if (realm->parent) { list_del_init(&realm->child_item); ceph_put_snap_realm(mdsc, realm->parent); @@ -320,7 +329,8 @@ static int cmpu64_rev(const void *a, const void *b) * build the snap context for a given realm. */ static int build_snap_context(struct ceph_snap_realm *realm, - struct list_head* dirty_realms) + struct list_head *realm_queue, + struct list_head *dirty_realms) { struct ceph_snap_realm *parent = realm->parent; struct ceph_snap_context *snapc; @@ -334,9 +344,9 @@ static int build_snap_context(struct ceph_snap_realm *realm, */ if (parent) { if (!parent->cached_context) { - err = build_snap_context(parent, dirty_realms); - if (err) - goto fail; + /* add to the queue head */ + list_add(&parent->rebuild_item, realm_queue); + return 1; } num += parent->cached_context->num_snaps; } @@ -349,9 +359,8 @@ static int build_snap_context(struct ceph_snap_realm *realm, realm->cached_context->seq == realm->seq && (!parent || realm->cached_context->seq >= parent->cached_context->seq)) { - dout("build_snap_context %llx %p: %p seq %lld (%u snaps)" - " (unchanged)\n", - realm->ino, realm, realm->cached_context, + dout("%s %llx %p: %p seq %lld (%u snaps) (unchanged)\n", + __func__, realm->ino, realm, realm->cached_context, realm->cached_context->seq, (unsigned int)realm->cached_context->num_snaps); return 0; @@ -390,9 +399,8 @@ static int build_snap_context(struct ceph_snap_realm *realm, sort(snapc->snaps, num, sizeof(u64), cmpu64_rev, NULL); snapc->num_snaps = num; - dout("build_snap_context %llx %p: %p seq %lld (%u snaps)\n", - realm->ino, realm, snapc, snapc->seq, - (unsigned int) snapc->num_snaps); + dout("%s %llx %p: %p seq %lld (%u snaps)\n", __func__, realm->ino, + realm, snapc, snapc->seq, (unsigned int) snapc->num_snaps); ceph_put_snap_context(realm->cached_context); realm->cached_context = snapc; @@ -409,8 +417,7 @@ static int build_snap_context(struct ceph_snap_realm *realm, ceph_put_snap_context(realm->cached_context); realm->cached_context = NULL; } - pr_err("build_snap_context %llx %p fail %d\n", realm->ino, - realm, err); + pr_err("%s %llx %p fail %d\n", __func__, realm->ino, realm, err); return err; } @@ -420,13 +427,50 @@ static int build_snap_context(struct ceph_snap_realm *realm, static void rebuild_snap_realms(struct ceph_snap_realm *realm, struct list_head *dirty_realms) { - struct ceph_snap_realm *child; + LIST_HEAD(realm_queue); + int last = 0; + bool skip = false; + + list_add_tail(&realm->rebuild_item, &realm_queue); + + while (!list_empty(&realm_queue)) { + struct ceph_snap_realm *_realm, *child; + + _realm = list_first_entry(&realm_queue, + struct ceph_snap_realm, + rebuild_item); + + /* + * If the last building failed dues to memory + * issue, just empty the realm_queue and return + * to avoid infinite loop. + */ + if (last < 0) { + list_del_init(&_realm->rebuild_item); + continue; + } + + last = build_snap_context(_realm, &realm_queue, dirty_realms); + dout("%s %llx %p, %s\n", __func__, _realm->ino, _realm, + last > 0 ? "is deferred" : !last ? "succeeded" : "failed"); + + /* is any child in the list ? */ + list_for_each_entry(child, &_realm->children, child_item) { + if (!list_empty(&child->rebuild_item)) { + skip = true; + break; + } + } - dout("rebuild_snap_realms %llx %p\n", realm->ino, realm); - build_snap_context(realm, dirty_realms); + if (!skip) { + list_for_each_entry(child, &_realm->children, child_item) + list_add_tail(&child->rebuild_item, &realm_queue); + } - list_for_each_entry(child, &realm->children, child_item) - rebuild_snap_realms(child, dirty_realms); + /* last == 1 means need to build parent first */ + if (last <= 0) + list_del_init(&_realm->rebuild_item); + } } @@ -474,23 +518,15 @@ static bool has_new_snaps(struct ceph_snap_context *o, * Caller must hold snap_rwsem for read (i.e., the realm topology won't * change). */ -static void ceph_queue_cap_snap(struct ceph_inode_info *ci) +static void ceph_queue_cap_snap(struct ceph_inode_info *ci, + struct ceph_cap_snap **pcapsnap) { struct inode *inode = &ci->vfs_inode; - struct ceph_cap_snap *capsnap; struct ceph_snap_context *old_snapc, *new_snapc; + struct ceph_cap_snap *capsnap = *pcapsnap; struct ceph_buffer *old_blob = NULL; int used, dirty; - capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS); - if (!capsnap) { - pr_err("ENOMEM allocating ceph_cap_snap on %p\n", inode); - return; - } - capsnap->cap_flush.is_capsnap = true; - INIT_LIST_HEAD(&capsnap->cap_flush.i_list); - INIT_LIST_HEAD(&capsnap->cap_flush.g_list); - spin_lock(&ci->i_ceph_lock); used = __ceph_caps_used(ci); dirty = __ceph_caps_dirty(ci); @@ -511,12 +547,14 @@ static void ceph_queue_cap_snap(struct ceph_inode_info *ci) as no new writes are allowed to start when pending, so any writes in progress now were started before the previous cap_snap. lucky us. */ - dout("queue_cap_snap %p already pending\n", inode); + dout("%s %p %llx.%llx already pending\n", + __func__, inode, ceph_vinop(inode)); goto update_snapc; } if (ci->i_wrbuffer_ref_head == 0 && !(dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR))) { - dout("queue_cap_snap %p nothing dirty|writing\n", inode); + dout("%s %p %llx.%llx nothing dirty|writing\n", + __func__, inode, ceph_vinop(inode)); goto update_snapc; } @@ -536,20 +574,17 @@ static void ceph_queue_cap_snap(struct ceph_inode_info *ci) } else { if (!(used & CEPH_CAP_FILE_WR) && ci->i_wrbuffer_ref_head == 0) { - dout("queue_cap_snap %p " - "no new_snap|dirty_page|writing\n", inode); + dout("%s %p %llx.%llx no new_snap|dirty_page|writing\n", + __func__, inode, ceph_vinop(inode)); goto update_snapc; } } - dout("queue_cap_snap %p cap_snap %p queuing under %p %s %s\n", - inode, capsnap, old_snapc, ceph_cap_string(dirty), - capsnap->need_flush ? "" : "no_flush"); + dout("%s %p %llx.%llx cap_snap %p queuing under %p %s %s\n", + __func__, inode, ceph_vinop(inode), capsnap, old_snapc, + ceph_cap_string(dirty), capsnap->need_flush ? "" : "no_flush"); ihold(inode); - refcount_set(&capsnap->nref, 1); - INIT_LIST_HEAD(&capsnap->ci_item); - capsnap->follows = old_snapc->seq; capsnap->issued = __ceph_caps_issued(ci, NULL); capsnap->dirty = dirty; @@ -579,31 +614,30 @@ static void ceph_queue_cap_snap(struct ceph_inode_info *ci) list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps); if (used & CEPH_CAP_FILE_WR) { - dout("queue_cap_snap %p cap_snap %p snapc %p" - " seq %llu used WR, now pending\n", inode, + dout("%s %p %llx.%llx cap_snap %p snapc %p seq %llu used WR," + " now pending\n", __func__, inode, ceph_vinop(inode), capsnap, old_snapc, old_snapc->seq); capsnap->writing = 1; } else { /* note mtime, size NOW. */ __ceph_finish_cap_snap(ci, capsnap); } - capsnap = NULL; + *pcapsnap = NULL; old_snapc = NULL; update_snapc: - if (ci->i_wrbuffer_ref_head == 0 && - ci->i_wr_ref == 0 && - ci->i_dirty_caps == 0 && - ci->i_flushing_caps == 0) { - ci->i_head_snapc = NULL; - } else { + if (ci->i_wrbuffer_ref_head == 0 && + ci->i_wr_ref == 0 && + ci->i_dirty_caps == 0 && + ci->i_flushing_caps == 0) { + ci->i_head_snapc = NULL; + } else { ci->i_head_snapc = ceph_get_snap_context(new_snapc); dout(" new snapc is %p\n", new_snapc); } spin_unlock(&ci->i_ceph_lock); ceph_buffer_put(old_blob); - kfree(capsnap); ceph_put_snap_context(old_snapc); } @@ -632,27 +666,28 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci, capsnap->truncate_size = ci->i_truncate_size; capsnap->truncate_seq = ci->i_truncate_seq; if (capsnap->dirty_pages) { - dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu " - "still has %d dirty pages\n", inode, capsnap, - capsnap->context, capsnap->context->seq, - ceph_cap_string(capsnap->dirty), capsnap->size, - capsnap->dirty_pages); + dout("%s %p %llx.%llx cap_snap %p snapc %p %llu %s s=%llu " + "still has %d dirty pages\n", __func__, inode, + ceph_vinop(inode), capsnap, capsnap->context, + capsnap->context->seq, ceph_cap_string(capsnap->dirty), + capsnap->size, capsnap->dirty_pages); return 0; } /* Fb cap still in use, delay it */ if (ci->i_wb_ref) { - dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu " - "used WRBUFFER, delaying\n", inode, capsnap, - capsnap->context, capsnap->context->seq, - ceph_cap_string(capsnap->dirty), capsnap->size); + dout("%s %p %llx.%llx cap_snap %p snapc %p %llu %s s=%llu " + "used WRBUFFER, delaying\n", __func__, inode, + ceph_vinop(inode), capsnap, capsnap->context, + capsnap->context->seq, ceph_cap_string(capsnap->dirty), + capsnap->size); capsnap->writing = 1; return 0; } ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS; - dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu\n", - inode, capsnap, capsnap->context, + dout("%s %p %llx.%llx cap_snap %p snapc %p %llu %s s=%llu\n", + __func__, inode, ceph_vinop(inode), capsnap, capsnap->context, capsnap->context->seq, ceph_cap_string(capsnap->dirty), capsnap->size); @@ -671,8 +706,9 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm) { struct ceph_inode_info *ci; struct inode *lastinode = NULL; + struct ceph_cap_snap *capsnap = NULL; - dout("queue_realm_cap_snaps %p %llx inodes\n", realm, realm->ino); + dout("%s %p %llx inode\n", __func__, realm, realm->ino); spin_lock(&realm->inodes_with_caps_lock); list_for_each_entry(ci, &realm->inodes_with_caps, i_snap_realm_item) { @@ -682,13 +718,35 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm) spin_unlock(&realm->inodes_with_caps_lock); iput(lastinode); lastinode = inode; - ceph_queue_cap_snap(ci); + + /* + * Allocate the capsnap memory outside of ceph_queue_cap_snap() + * to reduce very possible but unnecessary frequently memory + * allocate/free in this loop. + */ + if (!capsnap) { + capsnap = kmem_cache_zalloc(ceph_cap_snap_cachep, GFP_NOFS); + if (!capsnap) { + pr_err("ENOMEM allocating ceph_cap_snap on %p\n", + inode); + return; + } + } + capsnap->cap_flush.is_capsnap = true; + refcount_set(&capsnap->nref, 1); + INIT_LIST_HEAD(&capsnap->cap_flush.i_list); + INIT_LIST_HEAD(&capsnap->cap_flush.g_list); + INIT_LIST_HEAD(&capsnap->ci_item); + + ceph_queue_cap_snap(ci, &capsnap); spin_lock(&realm->inodes_with_caps_lock); } spin_unlock(&realm->inodes_with_caps_lock); iput(lastinode); - dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino); + if (capsnap) + kmem_cache_free(ceph_cap_snap_cachep, capsnap); + dout("%s %p %llx done\n", __func__, realm, realm->ino); } /* @@ -707,14 +765,16 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc, __le64 *prior_parent_snaps; /* encoded */ struct ceph_snap_realm *realm = NULL; struct ceph_snap_realm *first_realm = NULL; - int invalidate = 0; + struct ceph_snap_realm *realm_to_rebuild = NULL; + int rebuild_snapcs; int err = -ENOMEM; LIST_HEAD(dirty_realms); lockdep_assert_held_write(&mdsc->snap_rwsem); - dout("update_snap_trace deletion=%d\n", deletion); + dout("%s deletion=%d\n", __func__, deletion); more: + rebuild_snapcs = 0; ceph_decode_need(&p, e, sizeof(*ri), bad); ri = p; p += sizeof(*ri); @@ -738,10 +798,10 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc, err = adjust_snap_realm_parent(mdsc, realm, le64_to_cpu(ri->parent)); if (err < 0) goto fail; - invalidate += err; + rebuild_snapcs += err; if (le64_to_cpu(ri->seq) > realm->seq) { - dout("update_snap_trace updating %llx %p %lld -> %lld\n", + dout("%s updating %llx %p %lld -> %lld\n", __func__, realm->ino, realm, realm->seq, le64_to_cpu(ri->seq)); /* update realm parameters, snap lists */ realm->seq = le64_to_cpu(ri->seq); @@ -763,22 +823,30 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc, if (realm->seq > mdsc->last_snap_seq) mdsc->last_snap_seq = realm->seq; - invalidate = 1; + rebuild_snapcs = 1; } else if (!realm->cached_context) { - dout("update_snap_trace %llx %p seq %lld new\n", + dout("%s %llx %p seq %lld new\n", __func__, realm->ino, realm, realm->seq); - invalidate = 1; + rebuild_snapcs = 1; } else { - dout("update_snap_trace %llx %p seq %lld unchanged\n", + dout("%s %llx %p seq %lld unchanged\n", __func__, realm->ino, realm, realm->seq); } - dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino, - realm, invalidate, p, e); + dout("done with %llx %p, rebuild_snapcs=%d, %p %p\n", realm->ino, + realm, rebuild_snapcs, p, e); + + /* + * this will always track the uppest parent realm from which + * we need to rebuild the snapshot contexts _downward_ in + * hierarchy. + */ + if (rebuild_snapcs) + realm_to_rebuild = realm; - /* invalidate when we reach the _end_ (root) of the trace */ - if (invalidate && p >= e) - rebuild_snap_realms(realm, &dirty_realms); + /* rebuild_snapcs when we reach the _end_ (root) of the trace */ + if (realm_to_rebuild && p >= e) + rebuild_snap_realms(realm_to_rebuild, &dirty_realms); if (!first_realm) first_realm = realm; @@ -814,7 +882,7 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc, ceph_put_snap_realm(mdsc, realm); if (first_realm) ceph_put_snap_realm(mdsc, first_realm); - pr_err("update_snap_trace error %d\n", err); + pr_err("%s error %d\n", __func__, err); return err; } @@ -831,7 +899,7 @@ static void flush_snaps(struct ceph_mds_client *mdsc) struct inode *inode; struct ceph_mds_session *session = NULL; - dout("flush_snaps\n"); + dout("%s\n", __func__); spin_lock(&mdsc->snap_flush_lock); while (!list_empty(&mdsc->snap_flush_list)) { ci = list_first_entry(&mdsc->snap_flush_list, @@ -846,7 +914,7 @@ static void flush_snaps(struct ceph_mds_client *mdsc) spin_unlock(&mdsc->snap_flush_lock); ceph_put_mds_session(session); - dout("flush_snaps done\n"); + dout("%s done\n", __func__); } /** @@ -928,8 +996,8 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, trace_len = le32_to_cpu(h->trace_len); p += sizeof(*h); - dout("handle_snap from mds%d op %s split %llx tracelen %d\n", mds, - ceph_snap_op_name(op), split, trace_len); + dout("%s from mds%d op %s split %llx tracelen %d\n", __func__, + mds, ceph_snap_op_name(op), split, trace_len); mutex_lock(&session->s_mutex); inc_session_sequence(session); @@ -989,13 +1057,13 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, */ if (ci->i_snap_realm->created > le64_to_cpu(ri->created)) { - dout(" leaving %p in newer realm %llx %p\n", - inode, ci->i_snap_realm->ino, + dout(" leaving %p %llx.%llx in newer realm %llx %p\n", + inode, ceph_vinop(inode), ci->i_snap_realm->ino, ci->i_snap_realm); goto skip_inode; } - dout(" will move %p to split realm %llx %p\n", - inode, realm->ino, realm); + dout(" will move %p %llx.%llx to split realm %llx %p\n", + inode, ceph_vinop(inode), realm->ino, realm); ceph_get_snap_realm(mdsc, realm); ceph_change_snap_realm(inode, realm); @@ -1038,7 +1106,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, return; bad: - pr_err("corrupt snap message from mds%d\n", mds); + pr_err("%s corrupt snap message from mds%d\n", __func__, mds); ceph_msg_dump(msg); out: if (locked_rwsem) @@ -1071,7 +1139,8 @@ struct ceph_snapid_map* ceph_get_snapid_map(struct ceph_mds_client *mdsc, } spin_unlock(&mdsc->snapid_map_lock); if (exist) { - dout("found snapid map %llx -> %x\n", exist->snap, exist->dev); + dout("%s found snapid map %llx -> %x\n", __func__, + exist->snap, exist->dev); return exist; } @@ -1115,11 +1184,13 @@ struct ceph_snapid_map* ceph_get_snapid_map(struct ceph_mds_client *mdsc, if (exist) { free_anon_bdev(sm->dev); kfree(sm); - dout("found snapid map %llx -> %x\n", exist->snap, exist->dev); + dout("%s found snapid map %llx -> %x\n", __func__, + exist->snap, exist->dev); return exist; } - dout("create snapid map %llx -> %x\n", sm->snap, sm->dev); + dout("%s create snapid map %llx -> %x\n", __func__, + sm->snap, sm->dev); return sm; } diff --git a/fs/ceph/strings.c b/fs/ceph/strings.c index 573bb9556fb56cbf16802d7770fcf22305bc5469..e36e8948e7285b66127470ee82fa95c2bfce08e8 100644 --- a/fs/ceph/strings.c +++ b/fs/ceph/strings.c @@ -60,6 +60,7 @@ const char *ceph_mds_op_name(int op) case CEPH_MDS_OP_LOOKUPINO: return "lookupino"; case CEPH_MDS_OP_LOOKUPNAME: return "lookupname"; case CEPH_MDS_OP_GETATTR: return "getattr"; + case CEPH_MDS_OP_GETVXATTR: return "getvxattr"; case CEPH_MDS_OP_SETXATTR: return "setxattr"; case CEPH_MDS_OP_SETATTR: return "setattr"; case CEPH_MDS_OP_RMXATTR: return "rmxattr"; diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 4a3b77d049c76e588151a5fb5054272271aa711a..e6987d295079f3a882dea3715daad55f985dec55 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -865,6 +865,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc) */ struct kmem_cache *ceph_inode_cachep; struct kmem_cache *ceph_cap_cachep; +struct kmem_cache *ceph_cap_snap_cachep; struct kmem_cache *ceph_cap_flush_cachep; struct kmem_cache *ceph_dentry_cachep; struct kmem_cache *ceph_file_cachep; @@ -893,6 +894,9 @@ static int __init init_caches(void) ceph_cap_cachep = KMEM_CACHE(ceph_cap, SLAB_MEM_SPREAD); if (!ceph_cap_cachep) goto bad_cap; + ceph_cap_snap_cachep = KMEM_CACHE(ceph_cap_snap, SLAB_MEM_SPREAD); + if (!ceph_cap_snap_cachep) + goto bad_cap_snap; ceph_cap_flush_cachep = KMEM_CACHE(ceph_cap_flush, SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD); if (!ceph_cap_flush_cachep) @@ -932,6 +936,8 @@ static int __init init_caches(void) bad_dentry: kmem_cache_destroy(ceph_cap_flush_cachep); bad_cap_flush: + kmem_cache_destroy(ceph_cap_snap_cachep); +bad_cap_snap: kmem_cache_destroy(ceph_cap_cachep); bad_cap: kmem_cache_destroy(ceph_inode_cachep); @@ -948,6 +954,7 @@ static void destroy_caches(void) kmem_cache_destroy(ceph_inode_cachep); kmem_cache_destroy(ceph_cap_cachep); + kmem_cache_destroy(ceph_cap_snap_cachep); kmem_cache_destroy(ceph_cap_flush_cachep); kmem_cache_destroy(ceph_dentry_cachep); kmem_cache_destroy(ceph_file_cachep); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 0bd97aea231910a20c53bc1dfc9488b939ebcafa..a1ecc410a4952f99bdf7e171626543e05e4203ea 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -231,7 +231,7 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) if (refcount_dec_and_test(&capsnap->nref)) { if (capsnap->xattr_blob) ceph_buffer_put(capsnap->xattr_blob); - kfree(capsnap); + kmem_cache_free(ceph_cap_snap_cachep, capsnap); } } @@ -884,6 +884,8 @@ struct ceph_snap_realm { struct list_head dirty_item; /* if realm needs new context */ + struct list_head rebuild_item; /* rebuild snap realms _downward_ in hierarchy */ + /* the current set of snaps for this realm */ struct ceph_snap_context *cached_context; @@ -939,7 +941,7 @@ extern void ceph_handle_snap(struct ceph_mds_client *mdsc, struct ceph_msg *msg); extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci, struct ceph_cap_snap *capsnap); -extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc); +extern void ceph_cleanup_global_and_empty_realms(struct ceph_mds_client *mdsc); extern struct ceph_snapid_map *ceph_get_snapid_map(struct ceph_mds_client *mdsc, u64 snap); @@ -1049,6 +1051,7 @@ static inline bool ceph_inode_is_shutdown(struct inode *inode) /* xattr.c */ int __ceph_setxattr(struct inode *, const char *, const void *, size_t, int); +int ceph_do_getvxattr(struct inode *inode, const char *name, void *value, size_t size); ssize_t __ceph_getxattr(struct inode *, const char *, void *, size_t); extern ssize_t ceph_listxattr(struct dentry *, char *, size_t); extern struct ceph_buffer *__ceph_build_xattrs_blob(struct ceph_inode_info *ci); @@ -1214,7 +1217,7 @@ extern void __ceph_touch_fmode(struct ceph_inode_info *ci, /* addr.c */ extern const struct address_space_operations ceph_aops; extern int ceph_mmap(struct file *file, struct vm_area_struct *vma); -extern int ceph_uninline_data(struct file *filp, struct page *locked_page); +extern int ceph_uninline_data(struct file *file); extern int ceph_pool_perm_check(struct inode *inode, int need); extern void ceph_pool_perm_destroy(struct ceph_mds_client* mdsc); int ceph_purge_inode_cap(struct inode *inode, struct ceph_cap *cap, bool *invalidate); diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index fcf7dfdecf96603fe56085460dc68a6b7d34b6fd..afec840884719f0d5702a12b0ff9ce152e3e9a2c 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -923,10 +923,13 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value, { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_xattr *xattr; - struct ceph_vxattr *vxattr = NULL; + struct ceph_vxattr *vxattr; int req_mask; ssize_t err; + if (strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN)) + goto handle_non_vxattrs; + /* let's see if a virtual xattr was requested */ vxattr = ceph_match_vxattr(inode, name); if (vxattr) { @@ -945,8 +948,14 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value, err = -ERANGE; } return err; + } else { + err = ceph_do_getvxattr(inode, name, value, size); + /* this would happen with a new client and old server combo */ + if (err == -EOPNOTSUPP) + err = -ENODATA; + return err; } - +handle_non_vxattrs: req_mask = __get_request_mask(inode); spin_lock(&ci->i_ceph_lock); diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index 7ad6c3d0db7da1f2f76e26b540b0bfef75df15be..86bf82dbd8b81ab78043da89660f9fd0a71644b6 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -28,8 +28,8 @@ #define CEPH_INO_ROOT 1 -#define CEPH_INO_CEPH 2 /* hidden .ceph dir */ -#define CEPH_INO_DOTDOT 3 /* used by ceph fuse for parent (..) */ +#define CEPH_INO_CEPH 2 /* hidden .ceph dir */ +#define CEPH_INO_GLOBAL_SNAPREALM 3 /* global dummy snaprealm */ /* arbitrary limit on max # of monitors (cluster of 3 is typical) */ #define CEPH_MAX_MON 31 @@ -328,6 +328,7 @@ enum { CEPH_MDS_OP_LOOKUPPARENT = 0x00103, CEPH_MDS_OP_LOOKUPINO = 0x00104, CEPH_MDS_OP_LOOKUPNAME = 0x00105, + CEPH_MDS_OP_GETVXATTR = 0x00106, CEPH_MDS_OP_SETXATTR = 0x01105, CEPH_MDS_OP_RMXATTR = 0x01106, diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h index edf62eaa6285275a3bbd20824b9af1cbf7a4620e..00af2c98da75a7f3f36094ceb193b9d11d3171f6 100644 --- a/include/linux/ceph/libceph.h +++ b/include/linux/ceph/libceph.h @@ -284,6 +284,7 @@ DEFINE_RB_LOOKUP_FUNC(name, type, keyfld, nodefld) extern struct kmem_cache *ceph_inode_cachep; extern struct kmem_cache *ceph_cap_cachep; +extern struct kmem_cache *ceph_cap_snap_cachep; extern struct kmem_cache *ceph_cap_flush_cachep; extern struct kmem_cache *ceph_dentry_cachep; extern struct kmem_cache *ceph_file_cachep; diff --git a/net/ceph/messenger_v2.c b/net/ceph/messenger_v2.c index c81379f93ad51e4dd5b37f71f38f8e16affd4f19..c6e5bfc717d5460fb8b4238da7484aadeee2a6d8 100644 --- a/net/ceph/messenger_v2.c +++ b/net/ceph/messenger_v2.c @@ -1773,10 +1773,8 @@ static int prepare_read_data(struct ceph_connection *con) bv.bv_page = con->bounce_page; bv.bv_offset = 0; - set_in_bvec(con, &bv); - } else { - set_in_bvec(con, &bv); } + set_in_bvec(con, &bv); con->v2.in_state = IN_S_PREPARE_READ_DATA_CONT; return 0; } @@ -1807,10 +1805,8 @@ static void prepare_read_data_cont(struct ceph_connection *con) if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) { bv.bv_page = con->bounce_page; bv.bv_offset = 0; - set_in_bvec(con, &bv); - } else { - set_in_bvec(con, &bv); } + set_in_bvec(con, &bv); WARN_ON(con->v2.in_state != IN_S_PREPARE_READ_DATA_CONT); return; }