提交 dc594c39 编写于 作者: L Linus Torvalds

Merge tag 'ceph-for-4.18-rc1' of git://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "The main piece is a set of libceph changes that revamps how OSD
  requests are aborted, improving CephFS ENOSPC handling and making
  "umount -f" actually work (Zheng and myself).

  The rest is mostly mount option handling cleanups from Chengguang and
  assorted fixes from Zheng, Luis and Dongsheng.

* tag 'ceph-for-4.18-rc1' of git://github.com/ceph/ceph-client: (31 commits)
  rbd: flush rbd_dev->watch_dwork after watch is unregistered
  ceph: update description of some mount options
  ceph: show ino32 if the value is different with default
  ceph: strengthen rsize/wsize/readdir_max_bytes validation
  ceph: fix alignment of rasize
  ceph: fix use-after-free in ceph_statfs()
  ceph: prevent i_version from going back
  ceph: fix wrong check for the case of updating link count
  libceph: allocate the locator string with GFP_NOFAIL
  libceph: make abort_on_full a per-osdc setting
  libceph: don't abort reads in ceph_osdc_abort_on_full()
  libceph: avoid a use-after-free during map check
  libceph: don't warn if req->r_abort_on_full is set
  libceph: use for_each_request() in ceph_osdc_abort_on_full()
  libceph: defer __complete_request() to a workqueue
  libceph: move more code into __complete_request()
  libceph: no need to call flush_workqueue() before destruction
  ceph: flush pending works before shutdown super
  ceph: abort osd requests on force umount
  libceph: introduce ceph_osdc_abort_requests()
  ...
......@@ -105,15 +105,13 @@ Mount Options
address its connection to the monitor originates from.
wsize=X
Specify the maximum write size in bytes. By default there is no
maximum. Ceph will normally size writes based on the file stripe
size.
Specify the maximum write size in bytes. Default: 16 MB.
rsize=X
Specify the maximum read size in bytes. Default: 64 MB.
Specify the maximum read size in bytes. Default: 16 MB.
rasize=X
Specify the maximum readahead. Default: 8 MB.
Specify the maximum readahead size in bytes. Default: 8 MB.
mount_timeout=X
Specify the timeout value for mount (in seconds), in the case
......
......@@ -2339,6 +2339,7 @@ static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
{
unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
int ret;
dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
......@@ -2353,6 +2354,11 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
if (!obj_req->osd_req)
return -ENOMEM;
ret = osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd",
"copyup");
if (ret)
return ret;
/*
* Only send non-zero copyup data to save some I/O and network
* bandwidth -- zero copyup data is equivalent to the object not
......@@ -2362,9 +2368,6 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
bytes = 0;
}
osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd",
"copyup");
osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
obj_req->copyup_bvecs,
obj_req->copyup_bvec_count,
......@@ -3397,7 +3400,6 @@ static void cancel_tasks_sync(struct rbd_device *rbd_dev)
{
dout("%s rbd_dev %p\n", __func__, rbd_dev);
cancel_delayed_work_sync(&rbd_dev->watch_dwork);
cancel_work_sync(&rbd_dev->acquired_lock_work);
cancel_work_sync(&rbd_dev->released_lock_work);
cancel_delayed_work_sync(&rbd_dev->lock_dwork);
......@@ -3415,6 +3417,7 @@ static void rbd_unregister_watch(struct rbd_device *rbd_dev)
rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
mutex_unlock(&rbd_dev->watch_mutex);
cancel_delayed_work_sync(&rbd_dev->watch_dwork);
ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
}
......
......@@ -1936,7 +1936,6 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);
wr_req->r_mtime = ci->vfs_inode.i_mtime;
wr_req->r_abort_on_full = true;
err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);
if (!err)
......
......@@ -69,6 +69,8 @@ static char *gcap_string(char *s, int c)
*s++ = 'w';
if (c & CEPH_CAP_GBUFFER)
*s++ = 'b';
if (c & CEPH_CAP_GWREXTEND)
*s++ = 'a';
if (c & CEPH_CAP_GLAZYIO)
*s++ = 'l';
return s;
......@@ -3022,30 +3024,41 @@ static void invalidate_aliases(struct inode *inode)
dput(prev);
}
struct cap_extra_info {
struct ceph_string *pool_ns;
/* inline data */
u64 inline_version;
void *inline_data;
u32 inline_len;
/* dirstat */
bool dirstat_valid;
u64 nfiles;
u64 nsubdirs;
/* currently issued */
int issued;
};
/*
* Handle a cap GRANT message from the MDS. (Note that a GRANT may
* actually be a revocation if it specifies a smaller cap set.)
*
* caller holds s_mutex and i_ceph_lock, we drop both.
*/
static void handle_cap_grant(struct ceph_mds_client *mdsc,
struct inode *inode, struct ceph_mds_caps *grant,
struct ceph_string **pns, u64 inline_version,
void *inline_data, u32 inline_len,
struct ceph_buffer *xattr_buf,
static void handle_cap_grant(struct inode *inode,
struct ceph_mds_session *session,
struct ceph_cap *cap, int issued)
struct ceph_cap *cap,
struct ceph_mds_caps *grant,
struct ceph_buffer *xattr_buf,
struct cap_extra_info *extra_info)
__releases(ci->i_ceph_lock)
__releases(mdsc->snap_rwsem)
__releases(session->s_mdsc->snap_rwsem)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int mds = session->s_mds;
int seq = le32_to_cpu(grant->seq);
int newcaps = le32_to_cpu(grant->caps);
int used, wanted, dirty;
u64 size = le64_to_cpu(grant->size);
u64 max_size = le64_to_cpu(grant->max_size);
struct timespec mtime, atime, ctime;
int check_caps = 0;
bool wake = false;
bool writeback = false;
......@@ -3055,7 +3068,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
bool fill_inline = false;
dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
inode, cap, mds, seq, ceph_cap_string(newcaps));
inode, cap, session->s_mds, seq, ceph_cap_string(newcaps));
dout(" size %llu max_size %llu, i_size %llu\n", size, max_size,
inode->i_size);
......@@ -3101,7 +3114,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
__check_cap_issue(ci, cap, newcaps);
if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
(issued & CEPH_CAP_AUTH_EXCL) == 0) {
(extra_info->issued & CEPH_CAP_AUTH_EXCL) == 0) {
inode->i_mode = le32_to_cpu(grant->mode);
inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid));
inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid));
......@@ -3110,15 +3123,16 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
from_kgid(&init_user_ns, inode->i_gid));
}
if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
(issued & CEPH_CAP_LINK_EXCL) == 0) {
if ((newcaps & CEPH_CAP_LINK_SHARED) &&
(extra_info->issued & CEPH_CAP_LINK_EXCL) == 0) {
set_nlink(inode, le32_to_cpu(grant->nlink));
if (inode->i_nlink == 0 &&
(newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL)))
deleted_inode = true;
}
if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) {
if ((extra_info->issued & CEPH_CAP_XATTR_EXCL) == 0 &&
grant->xattr_len) {
int len = le32_to_cpu(grant->xattr_len);
u64 version = le64_to_cpu(grant->xattr_version);
......@@ -3134,15 +3148,21 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
}
if (newcaps & CEPH_CAP_ANY_RD) {
struct timespec mtime, atime, ctime;
/* ctime/mtime/atime? */
ceph_decode_timespec(&mtime, &grant->mtime);
ceph_decode_timespec(&atime, &grant->atime);
ceph_decode_timespec(&ctime, &grant->ctime);
ceph_fill_file_time(inode, issued,
ceph_fill_file_time(inode, extra_info->issued,
le32_to_cpu(grant->time_warp_seq),
&ctime, &mtime, &atime);
}
if ((newcaps & CEPH_CAP_FILE_SHARED) && extra_info->dirstat_valid) {
ci->i_files = extra_info->nfiles;
ci->i_subdirs = extra_info->nsubdirs;
}
if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
/* file layout may have changed */
s64 old_pool = ci->i_layout.pool_id;
......@@ -3151,15 +3171,16 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout);
old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
lockdep_is_held(&ci->i_ceph_lock));
rcu_assign_pointer(ci->i_layout.pool_ns, *pns);
rcu_assign_pointer(ci->i_layout.pool_ns, extra_info->pool_ns);
if (ci->i_layout.pool_id != old_pool || *pns != old_ns)
if (ci->i_layout.pool_id != old_pool ||
extra_info->pool_ns != old_ns)
ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
*pns = old_ns;
extra_info->pool_ns = old_ns;
/* size/truncate_seq? */
queue_trunc = ceph_fill_file_size(inode, issued,
queue_trunc = ceph_fill_file_size(inode, extra_info->issued,
le32_to_cpu(grant->truncate_seq),
le64_to_cpu(grant->truncate_size),
size);
......@@ -3238,24 +3259,26 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
}
BUG_ON(cap->issued & ~cap->implemented);
if (inline_version > 0 && inline_version >= ci->i_inline_version) {
ci->i_inline_version = inline_version;
if (extra_info->inline_version > 0 &&
extra_info->inline_version >= ci->i_inline_version) {
ci->i_inline_version = extra_info->inline_version;
if (ci->i_inline_version != CEPH_INLINE_NONE &&
(newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)))
fill_inline = true;
}
if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
if (newcaps & ~issued)
if (newcaps & ~extra_info->issued)
wake = true;
kick_flushing_inode_caps(mdsc, session, inode);
up_read(&mdsc->snap_rwsem);
kick_flushing_inode_caps(session->s_mdsc, session, inode);
up_read(&session->s_mdsc->snap_rwsem);
} else {
spin_unlock(&ci->i_ceph_lock);
}
if (fill_inline)
ceph_fill_inline_data(inode, NULL, inline_data, inline_len);
ceph_fill_inline_data(inode, NULL, extra_info->inline_data,
extra_info->inline_len);
if (queue_trunc)
ceph_queue_vmtruncate(inode);
......@@ -3720,31 +3743,25 @@ void ceph_handle_caps(struct ceph_mds_session *session,
struct ceph_msg *msg)
{
struct ceph_mds_client *mdsc = session->s_mdsc;
struct super_block *sb = mdsc->fsc->sb;
struct inode *inode;
struct ceph_inode_info *ci;
struct ceph_cap *cap;
struct ceph_mds_caps *h;
struct ceph_mds_cap_peer *peer = NULL;
struct ceph_snap_realm *realm = NULL;
struct ceph_string *pool_ns = NULL;
int mds = session->s_mds;
int op, issued;
int op;
int msg_version = le16_to_cpu(msg->hdr.version);
u32 seq, mseq;
struct ceph_vino vino;
u64 tid;
u64 inline_version = 0;
void *inline_data = NULL;
u32 inline_len = 0;
void *snaptrace;
size_t snaptrace_len;
void *p, *end;
struct cap_extra_info extra_info = {};
dout("handle_caps from mds%d\n", mds);
dout("handle_caps from mds%d\n", session->s_mds);
/* decode */
end = msg->front.iov_base + msg->front.iov_len;
tid = le64_to_cpu(msg->hdr.tid);
if (msg->front.iov_len < sizeof(*h))
goto bad;
h = msg->front.iov_base;
......@@ -3758,7 +3775,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
snaptrace_len = le32_to_cpu(h->snap_trace_len);
p = snaptrace + snaptrace_len;
if (le16_to_cpu(msg->hdr.version) >= 2) {
if (msg_version >= 2) {
u32 flock_len;
ceph_decode_32_safe(&p, end, flock_len, bad);
if (p + flock_len > end)
......@@ -3766,7 +3783,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
p += flock_len;
}
if (le16_to_cpu(msg->hdr.version) >= 3) {
if (msg_version >= 3) {
if (op == CEPH_CAP_OP_IMPORT) {
if (p + sizeof(*peer) > end)
goto bad;
......@@ -3778,16 +3795,16 @@ void ceph_handle_caps(struct ceph_mds_session *session,
}
}
if (le16_to_cpu(msg->hdr.version) >= 4) {
ceph_decode_64_safe(&p, end, inline_version, bad);
ceph_decode_32_safe(&p, end, inline_len, bad);
if (p + inline_len > end)
if (msg_version >= 4) {
ceph_decode_64_safe(&p, end, extra_info.inline_version, bad);
ceph_decode_32_safe(&p, end, extra_info.inline_len, bad);
if (p + extra_info.inline_len > end)
goto bad;
inline_data = p;
p += inline_len;
extra_info.inline_data = p;
p += extra_info.inline_len;
}
if (le16_to_cpu(msg->hdr.version) >= 5) {
if (msg_version >= 5) {
struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
u32 epoch_barrier;
......@@ -3795,7 +3812,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
ceph_osdc_update_epoch_barrier(osdc, epoch_barrier);
}
if (le16_to_cpu(msg->hdr.version) >= 8) {
if (msg_version >= 8) {
u64 flush_tid;
u32 caller_uid, caller_gid;
u32 pool_ns_len;
......@@ -3809,13 +3826,33 @@ void ceph_handle_caps(struct ceph_mds_session *session,
ceph_decode_32_safe(&p, end, pool_ns_len, bad);
if (pool_ns_len > 0) {
ceph_decode_need(&p, end, pool_ns_len, bad);
pool_ns = ceph_find_or_create_string(p, pool_ns_len);
extra_info.pool_ns =
ceph_find_or_create_string(p, pool_ns_len);
p += pool_ns_len;
}
}
if (msg_version >= 11) {
struct ceph_timespec *btime;
u64 change_attr;
u32 flags;
/* version >= 9 */
if (p + sizeof(*btime) > end)
goto bad;
btime = p;
p += sizeof(*btime);
ceph_decode_64_safe(&p, end, change_attr, bad);
/* version >= 10 */
ceph_decode_32_safe(&p, end, flags, bad);
/* version >= 11 */
extra_info.dirstat_valid = true;
ceph_decode_64_safe(&p, end, extra_info.nfiles, bad);
ceph_decode_64_safe(&p, end, extra_info.nsubdirs, bad);
}
/* lookup ino */
inode = ceph_find_inode(sb, vino);
inode = ceph_find_inode(mdsc->fsc->sb, vino);
ci = ceph_inode(inode);
dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino,
vino.snap, inode);
......@@ -3848,7 +3885,8 @@ void ceph_handle_caps(struct ceph_mds_session *session,
/* these will work even if we don't have a cap yet */
switch (op) {
case CEPH_CAP_OP_FLUSHSNAP_ACK:
handle_cap_flushsnap_ack(inode, tid, h, session);
handle_cap_flushsnap_ack(inode, le64_to_cpu(msg->hdr.tid),
h, session);
goto done;
case CEPH_CAP_OP_EXPORT:
......@@ -3867,10 +3905,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
down_read(&mdsc->snap_rwsem);
}
handle_cap_import(mdsc, inode, h, peer, session,
&cap, &issued);
handle_cap_grant(mdsc, inode, h, &pool_ns,
inline_version, inline_data, inline_len,
msg->middle, session, cap, issued);
&cap, &extra_info.issued);
handle_cap_grant(inode, session, cap,
h, msg->middle, &extra_info);
if (realm)
ceph_put_snap_realm(mdsc, realm);
goto done_unlocked;
......@@ -3878,10 +3915,11 @@ void ceph_handle_caps(struct ceph_mds_session *session,
/* the rest require a cap */
spin_lock(&ci->i_ceph_lock);
cap = __get_cap_for_mds(ceph_inode(inode), mds);
cap = __get_cap_for_mds(ceph_inode(inode), session->s_mds);
if (!cap) {
dout(" no cap on %p ino %llx.%llx from mds%d\n",
inode, ceph_ino(inode), ceph_snap(inode), mds);
inode, ceph_ino(inode), ceph_snap(inode),
session->s_mds);
spin_unlock(&ci->i_ceph_lock);
goto flush_cap_releases;
}
......@@ -3890,15 +3928,15 @@ void ceph_handle_caps(struct ceph_mds_session *session,
switch (op) {
case CEPH_CAP_OP_REVOKE:
case CEPH_CAP_OP_GRANT:
__ceph_caps_issued(ci, &issued);
issued |= __ceph_caps_dirty(ci);
handle_cap_grant(mdsc, inode, h, &pool_ns,
inline_version, inline_data, inline_len,
msg->middle, session, cap, issued);
__ceph_caps_issued(ci, &extra_info.issued);
extra_info.issued |= __ceph_caps_dirty(ci);
handle_cap_grant(inode, session, cap,
h, msg->middle, &extra_info);
goto done_unlocked;
case CEPH_CAP_OP_FLUSH_ACK:
handle_cap_flush_ack(inode, tid, h, session, cap);
handle_cap_flush_ack(inode, le64_to_cpu(msg->hdr.tid),
h, session, cap);
break;
case CEPH_CAP_OP_TRUNC:
......@@ -3925,7 +3963,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
mutex_unlock(&session->s_mutex);
done_unlocked:
iput(inode);
ceph_put_string(pool_ns);
ceph_put_string(extra_info.pool_ns);
return;
bad:
......
......@@ -1486,6 +1486,8 @@ const struct file_operations ceph_dir_fops = {
.release = ceph_release,
.unlocked_ioctl = ceph_ioctl,
.fsync = ceph_fsync,
.lock = ceph_lock,
.flock = ceph_flock,
};
const struct file_operations ceph_snapdir_fops = {
......
......@@ -895,7 +895,6 @@ static void ceph_aio_retry_work(struct work_struct *work)
req->r_callback = ceph_aio_complete_req;
req->r_inode = inode;
req->r_priv = aio_req;
req->r_abort_on_full = true;
ret = ceph_osdc_start_request(req->r_osdc, req, false);
out:
......
......@@ -739,7 +739,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
struct ceph_mds_reply_inode *info = iinfo->in;
struct ceph_inode_info *ci = ceph_inode(inode);
int issued = 0, implemented, new_issued;
int issued, new_issued, info_caps;
struct timespec mtime, atime, ctime;
struct ceph_buffer *xattr_blob = NULL;
struct ceph_string *pool_ns = NULL;
......@@ -754,8 +754,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
inode, ceph_vinop(inode), le64_to_cpu(info->version),
ci->i_version);
info_caps = le32_to_cpu(info->cap.caps);
/* prealloc new cap struct */
if (info->cap.caps && ceph_snap(inode) == CEPH_NOSNAP)
if (info_caps && ceph_snap(inode) == CEPH_NOSNAP)
new_cap = ceph_get_cap(mdsc, caps_reservation);
/*
......@@ -792,9 +794,9 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
le64_to_cpu(info->version) > (ci->i_version & ~1)))
new_version = true;
issued = __ceph_caps_issued(ci, &implemented);
issued |= implemented | __ceph_caps_dirty(ci);
new_issued = ~issued & le32_to_cpu(info->cap.caps);
__ceph_caps_issued(ci, &issued);
issued |= __ceph_caps_dirty(ci);
new_issued = ~issued & info_caps;
/* update inode */
inode->i_rdev = le32_to_cpu(info->rdev);
......@@ -826,6 +828,11 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
&ctime, &mtime, &atime);
}
if (new_version || (info_caps & CEPH_CAP_FILE_SHARED)) {
ci->i_files = le64_to_cpu(info->files);
ci->i_subdirs = le64_to_cpu(info->subdirs);
}
if (new_version ||
(new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
s64 old_pool = ci->i_layout.pool_id;
......@@ -854,6 +861,18 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
}
}
/* layout and rstat are not tracked by capability, update them if
* the inode info is from auth mds */
if (new_version || (info->cap.flags & CEPH_CAP_FLAG_AUTH)) {
if (S_ISDIR(inode->i_mode)) {
ci->i_dir_layout = iinfo->dir_layout;
ci->i_rbytes = le64_to_cpu(info->rbytes);
ci->i_rfiles = le64_to_cpu(info->rfiles);
ci->i_rsubdirs = le64_to_cpu(info->rsubdirs);
ceph_decode_timespec(&ci->i_rctime, &info->rctime);
}
}
/* xattrs */
/* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */
if ((ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL)) &&
......@@ -870,7 +889,8 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
}
/* finally update i_version */
ci->i_version = le64_to_cpu(info->version);
if (le64_to_cpu(info->version) > ci->i_version)
ci->i_version = le64_to_cpu(info->version);
inode->i_mapping->a_ops = &ceph_aops;
......@@ -918,15 +938,6 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
case S_IFDIR:
inode->i_op = &ceph_dir_iops;
inode->i_fop = &ceph_dir_fops;
ci->i_dir_layout = iinfo->dir_layout;
ci->i_files = le64_to_cpu(info->files);
ci->i_subdirs = le64_to_cpu(info->subdirs);
ci->i_rbytes = le64_to_cpu(info->rbytes);
ci->i_rfiles = le64_to_cpu(info->rfiles);
ci->i_rsubdirs = le64_to_cpu(info->rsubdirs);
ceph_decode_timespec(&ci->i_rctime, &info->rctime);
break;
default:
pr_err("fill_inode %llx.%llx BAD mode 0%o\n",
......@@ -934,12 +945,11 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
}
/* were we issued a capability? */
if (info->cap.caps) {
if (info_caps) {
if (ceph_snap(inode) == CEPH_NOSNAP) {
unsigned caps = le32_to_cpu(info->cap.caps);
ceph_add_cap(inode, session,
le64_to_cpu(info->cap.cap_id),
cap_fmode, caps,
cap_fmode, info_caps,
le32_to_cpu(info->cap.wanted),
le32_to_cpu(info->cap.seq),
le32_to_cpu(info->cap.mseq),
......@@ -949,7 +959,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
/* set dir completion flag? */
if (S_ISDIR(inode->i_mode) &&
ci->i_files == 0 && ci->i_subdirs == 0 &&
(caps & CEPH_CAP_FILE_SHARED) &&
(info_caps & CEPH_CAP_FILE_SHARED) &&
(issued & CEPH_CAP_FILE_EXCL) == 0 &&
!__ceph_dir_is_complete(ci)) {
dout(" marking %p complete (empty)\n", inode);
......@@ -962,8 +972,8 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
wake = true;
} else {
dout(" %p got snap_caps %s\n", inode,
ceph_cap_string(le32_to_cpu(info->cap.caps)));
ci->i_snap_caps |= le32_to_cpu(info->cap.caps);
ceph_cap_string(info_caps));
ci->i_snap_caps |= info_caps;
if (cap_fmode >= 0)
__ceph_get_fmode(ci, cap_fmode);
}
......@@ -978,8 +988,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
int cache_caps = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
ci->i_inline_version = iinfo->inline_version;
if (ci->i_inline_version != CEPH_INLINE_NONE &&
(locked_page ||
(le32_to_cpu(info->cap.caps) & cache_caps)))
(locked_page || (info_caps & cache_caps)))
fill_inline = true;
}
......@@ -2178,6 +2187,7 @@ int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_mds_request *req;
int mode;
int err;
if (ceph_snap(inode) == CEPH_SNAPDIR) {
......@@ -2190,7 +2200,8 @@ int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
return 0;
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS);
mode = (mask & CEPH_STAT_RSTAT) ? USE_AUTH_MDS : USE_ANY_MDS;
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, mode);
if (IS_ERR(req))
return PTR_ERR(req);
req->r_inode = inode;
......@@ -2261,6 +2272,14 @@ int ceph_getattr(const struct path *path, struct kstat *stat,
stat->size = ci->i_files + ci->i_subdirs;
stat->blocks = 0;
stat->blksize = 65536;
/*
* Some applications rely on the number of st_nlink
* value on directories to be either 0 (if unlinked)
* or 2 + number of subdirectories.
*/
if (stat->nlink == 1)
/* '.' + '..' + subdirs */
stat->nlink = 1 + 1 + ci->i_subdirs;
}
}
return err;
......
......@@ -45,7 +45,7 @@ static void ceph_put_super(struct super_block *s)
static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
{
struct ceph_fs_client *fsc = ceph_inode_to_client(d_inode(dentry));
struct ceph_monmap *monmap = fsc->client->monc.monmap;
struct ceph_mon_client *monc = &fsc->client->monc;
struct ceph_statfs st;
u64 fsid;
int err;
......@@ -58,7 +58,7 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
}
dout("statfs\n");
err = ceph_monc_do_statfs(&fsc->client->monc, data_pool, &st);
err = ceph_monc_do_statfs(monc, data_pool, &st);
if (err < 0)
return err;
......@@ -94,8 +94,11 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
buf->f_namelen = NAME_MAX;
/* Must convert the fsid, for consistent values across arches */
fsid = le64_to_cpu(*(__le64 *)(&monmap->fsid)) ^
le64_to_cpu(*((__le64 *)&monmap->fsid + 1));
mutex_lock(&monc->mutex);
fsid = le64_to_cpu(*(__le64 *)(&monc->monmap->fsid)) ^
le64_to_cpu(*((__le64 *)&monc->monmap->fsid + 1));
mutex_unlock(&monc->mutex);
buf->f_fsid.val[0] = fsid & 0xffffffff;
buf->f_fsid.val[1] = fsid >> 32;
......@@ -256,19 +259,19 @@ static int parse_fsopt_token(char *c, void *private)
break;
/* misc */
case Opt_wsize:
if (intval < PAGE_SIZE || intval > CEPH_MAX_WRITE_SIZE)
if (intval < (int)PAGE_SIZE || intval > CEPH_MAX_WRITE_SIZE)
return -EINVAL;
fsopt->wsize = ALIGN(intval, PAGE_SIZE);
break;
case Opt_rsize:
if (intval < PAGE_SIZE || intval > CEPH_MAX_READ_SIZE)
if (intval < (int)PAGE_SIZE || intval > CEPH_MAX_READ_SIZE)
return -EINVAL;
fsopt->rsize = ALIGN(intval, PAGE_SIZE);
break;
case Opt_rasize:
if (intval < 0)
return -EINVAL;
fsopt->rasize = ALIGN(intval + PAGE_SIZE - 1, PAGE_SIZE);
fsopt->rasize = ALIGN(intval, PAGE_SIZE);
break;
case Opt_caps_wanted_delay_min:
if (intval < 1)
......@@ -286,7 +289,7 @@ static int parse_fsopt_token(char *c, void *private)
fsopt->max_readdir = intval;
break;
case Opt_readdir_max_bytes:
if (intval < PAGE_SIZE && intval != 0)
if (intval < (int)PAGE_SIZE && intval != 0)
return -EINVAL;
fsopt->max_readdir_bytes = intval;
break;
......@@ -534,6 +537,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
seq_puts(m, ",noasyncreaddir");
if ((fsopt->flags & CEPH_MOUNT_OPT_DCACHE) == 0)
seq_puts(m, ",nodcache");
if (fsopt->flags & CEPH_MOUNT_OPT_INO32)
seq_puts(m, ",ino32");
if (fsopt->flags & CEPH_MOUNT_OPT_FSCACHE) {
seq_show_option(m, "fsc", fsopt->fscache_uniq);
}
......@@ -551,7 +556,7 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
if (fsopt->mds_namespace)
seq_show_option(m, "mds_namespace", fsopt->mds_namespace);
if (fsopt->wsize)
if (fsopt->wsize != CEPH_MAX_WRITE_SIZE)
seq_printf(m, ",wsize=%d", fsopt->wsize);
if (fsopt->rsize != CEPH_MAX_READ_SIZE)
seq_printf(m, ",rsize=%d", fsopt->rsize);
......@@ -616,7 +621,9 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
err = PTR_ERR(fsc->client);
goto fail;
}
fsc->client->extra_mon_dispatch = extra_mon_dispatch;
fsc->client->osdc.abort_on_full = true;
if (!fsopt->mds_namespace) {
ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
......@@ -674,6 +681,13 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
return ERR_PTR(err);
}
static void flush_fs_workqueues(struct ceph_fs_client *fsc)
{
flush_workqueue(fsc->wb_wq);
flush_workqueue(fsc->pg_inv_wq);
flush_workqueue(fsc->trunc_wq);
}
static void destroy_fs_client(struct ceph_fs_client *fsc)
{
dout("destroy_fs_client %p\n", fsc);
......@@ -793,6 +807,7 @@ static void ceph_umount_begin(struct super_block *sb)
if (!fsc)
return;
fsc->mount_state = CEPH_MOUNT_SHUTDOWN;
ceph_osdc_abort_requests(&fsc->client->osdc, -EIO);
ceph_mdsc_force_umount(fsc->mdsc);
return;
}
......@@ -1088,6 +1103,8 @@ static void ceph_kill_sb(struct super_block *s)
dout("kill_sb %p\n", s);
ceph_mdsc_pre_umount(fsc->mdsc);
flush_fs_workqueues(fsc);
generic_shutdown_super(s);
fsc->client->extra_mon_dispatch = NULL;
......
......@@ -50,10 +50,14 @@ struct ceph_vxattr {
size_t name_size; /* strlen(name) + 1 (for '\0') */
size_t (*getxattr_cb)(struct ceph_inode_info *ci, char *val,
size_t size);
bool readonly, hidden;
bool (*exists_cb)(struct ceph_inode_info *ci);
unsigned int flags;
};
#define VXATTR_FLAG_READONLY (1<<0)
#define VXATTR_FLAG_HIDDEN (1<<1)
#define VXATTR_FLAG_RSTAT (1<<2)
/* layouts */
static bool ceph_vxattrcb_layout_exists(struct ceph_inode_info *ci)
......@@ -262,32 +266,31 @@ static size_t ceph_vxattrcb_quota_max_files(struct ceph_inode_info *ci,
#define CEPH_XATTR_NAME2(_type, _name, _name2) \
XATTR_CEPH_PREFIX #_type "." #_name "." #_name2
#define XATTR_NAME_CEPH(_type, _name) \
#define XATTR_NAME_CEPH(_type, _name, _flags) \
{ \
.name = CEPH_XATTR_NAME(_type, _name), \
.name_size = sizeof (CEPH_XATTR_NAME(_type, _name)), \
.getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name, \
.readonly = true, \
.hidden = false, \
.exists_cb = NULL, \
.exists_cb = NULL, \
.flags = (VXATTR_FLAG_READONLY | _flags), \
}
#define XATTR_RSTAT_FIELD(_type, _name) \
XATTR_NAME_CEPH(_type, _name, VXATTR_FLAG_RSTAT)
#define XATTR_LAYOUT_FIELD(_type, _name, _field) \
{ \
.name = CEPH_XATTR_NAME2(_type, _name, _field), \
.name_size = sizeof (CEPH_XATTR_NAME2(_type, _name, _field)), \
.getxattr_cb = ceph_vxattrcb_ ## _name ## _ ## _field, \
.readonly = false, \
.hidden = true, \
.exists_cb = ceph_vxattrcb_layout_exists, \
.flags = VXATTR_FLAG_HIDDEN, \
}
#define XATTR_QUOTA_FIELD(_type, _name) \
{ \
.name = CEPH_XATTR_NAME(_type, _name), \
.name_size = sizeof(CEPH_XATTR_NAME(_type, _name)), \
.getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name, \
.readonly = false, \
.hidden = true, \
.exists_cb = ceph_vxattrcb_quota_exists, \
.flags = VXATTR_FLAG_HIDDEN, \
}
static struct ceph_vxattr ceph_dir_vxattrs[] = {
......@@ -295,30 +298,28 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = {
.name = "ceph.dir.layout",
.name_size = sizeof("ceph.dir.layout"),
.getxattr_cb = ceph_vxattrcb_layout,
.readonly = false,
.hidden = true,
.exists_cb = ceph_vxattrcb_layout_exists,
.flags = VXATTR_FLAG_HIDDEN,
},
XATTR_LAYOUT_FIELD(dir, layout, stripe_unit),
XATTR_LAYOUT_FIELD(dir, layout, stripe_count),
XATTR_LAYOUT_FIELD(dir, layout, object_size),
XATTR_LAYOUT_FIELD(dir, layout, pool),
XATTR_LAYOUT_FIELD(dir, layout, pool_namespace),
XATTR_NAME_CEPH(dir, entries),
XATTR_NAME_CEPH(dir, files),
XATTR_NAME_CEPH(dir, subdirs),
XATTR_NAME_CEPH(dir, rentries),
XATTR_NAME_CEPH(dir, rfiles),
XATTR_NAME_CEPH(dir, rsubdirs),
XATTR_NAME_CEPH(dir, rbytes),
XATTR_NAME_CEPH(dir, rctime),
XATTR_NAME_CEPH(dir, entries, 0),
XATTR_NAME_CEPH(dir, files, 0),
XATTR_NAME_CEPH(dir, subdirs, 0),
XATTR_RSTAT_FIELD(dir, rentries),
XATTR_RSTAT_FIELD(dir, rfiles),
XATTR_RSTAT_FIELD(dir, rsubdirs),
XATTR_RSTAT_FIELD(dir, rbytes),
XATTR_RSTAT_FIELD(dir, rctime),
{
.name = "ceph.quota",
.name_size = sizeof("ceph.quota"),
.getxattr_cb = ceph_vxattrcb_quota,
.readonly = false,
.hidden = true,
.exists_cb = ceph_vxattrcb_quota_exists,
.flags = VXATTR_FLAG_HIDDEN,
},
XATTR_QUOTA_FIELD(quota, max_bytes),
XATTR_QUOTA_FIELD(quota, max_files),
......@@ -333,9 +334,8 @@ static struct ceph_vxattr ceph_file_vxattrs[] = {
.name = "ceph.file.layout",
.name_size = sizeof("ceph.file.layout"),
.getxattr_cb = ceph_vxattrcb_layout,
.readonly = false,
.hidden = true,
.exists_cb = ceph_vxattrcb_layout_exists,
.flags = VXATTR_FLAG_HIDDEN,
},
XATTR_LAYOUT_FIELD(file, layout, stripe_unit),
XATTR_LAYOUT_FIELD(file, layout, stripe_count),
......@@ -374,9 +374,10 @@ static size_t __init vxattrs_name_size(struct ceph_vxattr *vxattrs)
struct ceph_vxattr *vxattr;
size_t size = 0;
for (vxattr = vxattrs; vxattr->name; vxattr++)
if (!vxattr->hidden)
for (vxattr = vxattrs; vxattr->name; vxattr++) {
if (!(vxattr->flags & VXATTR_FLAG_HIDDEN))
size += vxattr->name_size;
}
return size;
}
......@@ -809,7 +810,10 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
/* let's see if a virtual xattr was requested */
vxattr = ceph_match_vxattr(inode, name);
if (vxattr) {
err = ceph_do_getattr(inode, 0, true);
int mask = 0;
if (vxattr->flags & VXATTR_FLAG_RSTAT)
mask |= CEPH_STAT_RSTAT;
err = ceph_do_getattr(inode, mask, true);
if (err)
return err;
err = -ENODATA;
......@@ -919,7 +923,7 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
err = namelen;
if (vxattrs) {
for (i = 0; vxattrs[i].name; i++) {
if (!vxattrs[i].hidden &&
if (!(vxattrs[i].flags & VXATTR_FLAG_HIDDEN) &&
!(vxattrs[i].exists_cb &&
!vxattrs[i].exists_cb(ci))) {
len = sprintf(names, "%s", vxattrs[i].name);
......@@ -1024,7 +1028,7 @@ int __ceph_setxattr(struct inode *inode, const char *name,
vxattr = ceph_match_vxattr(inode, name);
if (vxattr) {
if (vxattr->readonly)
if (vxattr->flags & VXATTR_FLAG_READONLY)
return -EOPNOTSUPP;
if (value && !strncmp(vxattr->name, "ceph.quota", 10))
check_realm = true;
......
......@@ -628,6 +628,7 @@ int ceph_flags_to_mode(int flags);
CEPH_CAP_XATTR_SHARED)
#define CEPH_STAT_CAP_INLINE_DATA (CEPH_CAP_FILE_SHARED | \
CEPH_CAP_FILE_RD)
#define CEPH_STAT_RSTAT CEPH_CAP_FILE_WREXTEND
#define CEPH_CAP_ANY_SHARED (CEPH_CAP_AUTH_SHARED | \
CEPH_CAP_LINK_SHARED | \
......
......@@ -170,6 +170,7 @@ struct ceph_osd_request {
u64 r_tid; /* unique for this client */
struct rb_node r_node;
struct rb_node r_mc_node; /* map check */
struct work_struct r_complete_work;
struct ceph_osd *r_osd;
struct ceph_osd_request_target r_t;
......@@ -201,7 +202,6 @@ struct ceph_osd_request {
struct timespec r_mtime; /* ditto */
u64 r_data_offset; /* ditto */
bool r_linger; /* don't resend on failure */
bool r_abort_on_full; /* return ENOSPC when full */
/* internal */
unsigned long r_stamp; /* jiffies, send or check time */
......@@ -347,6 +347,8 @@ struct ceph_osd_client {
struct rb_root linger_map_checks;
atomic_t num_requests;
atomic_t num_homeless;
bool abort_on_full; /* abort w/ ENOSPC when full */
int abort_err;
struct delayed_work timeout_work;
struct delayed_work osds_timeout_work;
#ifdef CONFIG_DEBUG_FS
......@@ -359,6 +361,7 @@ struct ceph_osd_client {
struct ceph_msgpool msgpool_op_reply;
struct workqueue_struct *notify_wq;
struct workqueue_struct *completion_wq;
};
static inline bool ceph_osdmap_flag(struct ceph_osd_client *osdc, int flag)
......@@ -378,6 +381,7 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
struct ceph_msg *msg);
void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb);
void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err);
extern void osd_req_op_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode, u32 flags);
......@@ -440,7 +444,7 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
struct page **pages, u64 length,
u32 alignment, bool pages_from_pool,
bool own_pages);
extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
extern int osd_req_op_cls_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode,
const char *class, const char *method);
extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
......
......@@ -279,10 +279,10 @@ bool ceph_osds_changed(const struct ceph_osds *old_acting,
const struct ceph_osds *new_acting,
bool any_change);
int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
const struct ceph_object_id *oid,
const struct ceph_object_locator *oloc,
struct ceph_pg *raw_pgid);
void __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
const struct ceph_object_id *oid,
const struct ceph_object_locator *oloc,
struct ceph_pg *raw_pgid);
int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
const struct ceph_object_id *oid,
const struct ceph_object_locator *oloc,
......
......@@ -168,12 +168,6 @@ static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
static struct lock_class_key socket_class;
#endif
/*
* When skipping (ignoring) a block of input we read it into a "skip
* buffer," which is this many bytes in size.
*/
#define SKIP_BUF_SIZE 1024
static void queue_con(struct ceph_connection *con);
static void cancel_con(struct ceph_connection *con);
static void ceph_con_workfn(struct work_struct *);
......@@ -520,12 +514,18 @@ static int ceph_tcp_connect(struct ceph_connection *con)
return 0;
}
/*
* If @buf is NULL, discard up to @len bytes.
*/
static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
{
struct kvec iov = {buf, len};
struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
int r;
if (!buf)
msg.msg_flags |= MSG_TRUNC;
iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, len);
r = sock_recvmsg(sock, &msg, msg.msg_flags);
if (r == -EAGAIN)
......@@ -2575,9 +2575,6 @@ static int try_write(struct ceph_connection *con)
con->state != CON_STATE_OPEN)
return 0;
more:
dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
/* open the socket first? */
if (con->state == CON_STATE_PREOPEN) {
BUG_ON(con->sock);
......@@ -2598,7 +2595,8 @@ static int try_write(struct ceph_connection *con)
}
}
more_kvec:
more:
dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
BUG_ON(!con->sock);
/* kvec data queued? */
......@@ -2623,7 +2621,7 @@ static int try_write(struct ceph_connection *con)
ret = write_partial_message_data(con);
if (ret == 1)
goto more_kvec; /* we need to send the footer, too! */
goto more; /* we need to send the footer, too! */
if (ret == 0)
goto out;
if (ret < 0) {
......@@ -2659,8 +2657,6 @@ static int try_write(struct ceph_connection *con)
return ret;
}
/*
* Read what we can from the socket.
*/
......@@ -2721,16 +2717,11 @@ static int try_read(struct ceph_connection *con)
if (con->in_base_pos < 0) {
/*
* skipping + discarding content.
*
* FIXME: there must be a better way to do this!
*/
static char buf[SKIP_BUF_SIZE];
int skip = min((int) sizeof (buf), -con->in_base_pos);
dout("skipping %d / %d bytes\n", skip, -con->in_base_pos);
ret = ceph_tcp_recvmsg(con->sock, buf, skip);
ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos);
if (ret <= 0)
goto out;
dout("skipped %d / %d bytes\n", ret, -con->in_base_pos);
con->in_base_pos += ret;
if (con->in_base_pos)
goto more;
......
......@@ -766,7 +766,7 @@ void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
}
EXPORT_SYMBOL(osd_req_op_extent_dup_last);
void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
u16 opcode, const char *class, const char *method)
{
struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
......@@ -778,7 +778,9 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
BUG_ON(opcode != CEPH_OSD_OP_CALL);
pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
BUG_ON(!pagelist);
if (!pagelist)
return -ENOMEM;
ceph_pagelist_init(pagelist);
op->cls.class_name = class;
......@@ -798,6 +800,7 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
op->indata_len = payload_len;
return 0;
}
EXPORT_SYMBOL(osd_req_op_cls_init);
......@@ -1026,7 +1029,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
truncate_size, truncate_seq);
}
req->r_abort_on_full = true;
req->r_flags = flags;
req->r_base_oloc.pool = layout->pool_id;
req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
......@@ -1054,6 +1056,38 @@ EXPORT_SYMBOL(ceph_osdc_new_request);
DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node)
/*
* Call @fn on each OSD request as long as @fn returns 0.
*/
static void for_each_request(struct ceph_osd_client *osdc,
int (*fn)(struct ceph_osd_request *req, void *arg),
void *arg)
{
struct rb_node *n, *p;
for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
for (p = rb_first(&osd->o_requests); p; ) {
struct ceph_osd_request *req =
rb_entry(p, struct ceph_osd_request, r_node);
p = rb_next(p);
if (fn(req, arg))
return;
}
}
for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
struct ceph_osd_request *req =
rb_entry(p, struct ceph_osd_request, r_node);
p = rb_next(p);
if (fn(req, arg))
return;
}
}
static bool osd_homeless(struct ceph_osd *osd)
{
return osd->o_osd == CEPH_HOMELESS_OSD;
......@@ -1395,7 +1429,6 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
bool recovery_deletes = ceph_osdmap_flag(osdc,
CEPH_OSDMAP_RECOVERY_DELETES);
enum calc_target_result ct_res;
int ret;
t->epoch = osdc->osdmap->epoch;
pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
......@@ -1431,14 +1464,7 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
}
}
ret = __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc,
&pgid);
if (ret) {
WARN_ON(ret != -ENOENT);
t->osd = CEPH_HOMELESS_OSD;
ct_res = CALC_TARGET_POOL_DNE;
goto out;
}
__ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, &pgid);
last_pgid.pool = pgid.pool;
last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);
......@@ -2161,9 +2187,9 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
struct ceph_osd_client *osdc = req->r_osdc;
struct ceph_osd *osd;
enum calc_target_result ct_res;
int err = 0;
bool need_send = false;
bool promoted = false;
bool need_abort = false;
WARN_ON(req->r_tid);
dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
......@@ -2179,7 +2205,10 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
goto promote;
}
if (osdc->osdmap->epoch < osdc->epoch_barrier) {
if (osdc->abort_err) {
dout("req %p abort_err %d\n", req, osdc->abort_err);
err = osdc->abort_err;
} else if (osdc->osdmap->epoch < osdc->epoch_barrier) {
dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch,
osdc->epoch_barrier);
req->r_t.paused = true;
......@@ -2200,11 +2229,13 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
(ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
pool_full(osdc, req->r_t.base_oloc.pool))) {
dout("req %p full/pool_full\n", req);
pr_warn_ratelimited("FULL or reached pool quota\n");
req->r_t.paused = true;
maybe_request_map(osdc);
if (req->r_abort_on_full)
need_abort = true;
if (osdc->abort_on_full) {
err = -ENOSPC;
} else {
pr_warn_ratelimited("FULL or reached pool quota\n");
req->r_t.paused = true;
maybe_request_map(osdc);
}
} else if (!osd_homeless(osd)) {
need_send = true;
} else {
......@@ -2221,11 +2252,11 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
link_request(osd, req);
if (need_send)
send_request(req);
else if (need_abort)
complete_request(req, -ENOSPC);
else if (err)
complete_request(req, err);
mutex_unlock(&osd->lock);
if (ct_res == CALC_TARGET_POOL_DNE)
if (!err && ct_res == CALC_TARGET_POOL_DNE)
send_map_check(req);
if (promoted)
......@@ -2281,11 +2312,21 @@ static void finish_request(struct ceph_osd_request *req)
static void __complete_request(struct ceph_osd_request *req)
{
if (req->r_callback) {
dout("%s req %p tid %llu cb %pf result %d\n", __func__, req,
req->r_tid, req->r_callback, req->r_result);
dout("%s req %p tid %llu cb %pf result %d\n", __func__, req,
req->r_tid, req->r_callback, req->r_result);
if (req->r_callback)
req->r_callback(req);
}
complete_all(&req->r_completion);
ceph_osdc_put_request(req);
}
static void complete_request_workfn(struct work_struct *work)
{
struct ceph_osd_request *req =
container_of(work, struct ceph_osd_request, r_complete_work);
__complete_request(req);
}
/*
......@@ -2297,9 +2338,9 @@ static void complete_request(struct ceph_osd_request *req, int err)
req->r_result = err;
finish_request(req);
__complete_request(req);
complete_all(&req->r_completion);
ceph_osdc_put_request(req);
INIT_WORK(&req->r_complete_work, complete_request_workfn);
queue_work(req->r_osdc->completion_wq, &req->r_complete_work);
}
static void cancel_map_check(struct ceph_osd_request *req)
......@@ -2336,6 +2377,28 @@ static void abort_request(struct ceph_osd_request *req, int err)
complete_request(req, err);
}
static int abort_fn(struct ceph_osd_request *req, void *arg)
{
int err = *(int *)arg;
abort_request(req, err);
return 0; /* continue iteration */
}
/*
* Abort all in-flight requests with @err and arrange for all future
* requests to be failed immediately.
*/
void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err)
{
dout("%s osdc %p err %d\n", __func__, osdc, err);
down_write(&osdc->lock);
for_each_request(osdc, abort_fn, &err);
osdc->abort_err = err;
up_write(&osdc->lock);
}
EXPORT_SYMBOL(ceph_osdc_abort_requests);
static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
{
if (likely(eb > osdc->epoch_barrier)) {
......@@ -2362,6 +2425,30 @@ void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
}
EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
/*
* We can end up releasing caps as a result of abort_request().
* In that case, we probably want to ensure that the cap release message
* has an updated epoch barrier in it, so set the epoch barrier prior to
* aborting the first request.
*/
static int abort_on_full_fn(struct ceph_osd_request *req, void *arg)
{
struct ceph_osd_client *osdc = req->r_osdc;
bool *victims = arg;
if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
(ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
pool_full(osdc, req->r_t.base_oloc.pool))) {
if (!*victims) {
update_epoch_barrier(osdc, osdc->osdmap->epoch);
*victims = true;
}
abort_request(req, -ENOSPC);
}
return 0; /* continue iteration */
}
/*
* Drop all pending requests that are stalled waiting on a full condition to
* clear, and complete them with ENOSPC as the return code. Set the
......@@ -2370,61 +2457,11 @@ EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
*/
static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
{
struct rb_node *n;
bool victims = false;
dout("enter abort_on_full\n");
if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc))
goto out;
/* Scan list and see if there is anything to abort */
for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
struct rb_node *m;
m = rb_first(&osd->o_requests);
while (m) {
struct ceph_osd_request *req = rb_entry(m,
struct ceph_osd_request, r_node);
m = rb_next(m);
if (req->r_abort_on_full) {
victims = true;
break;
}
}
if (victims)
break;
}
if (!victims)
goto out;
/*
* Update the barrier to current epoch if it's behind that point,
* since we know we have some calls to be aborted in the tree.
*/
update_epoch_barrier(osdc, osdc->osdmap->epoch);
for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
struct rb_node *m;
m = rb_first(&osd->o_requests);
while (m) {
struct ceph_osd_request *req = rb_entry(m,
struct ceph_osd_request, r_node);
m = rb_next(m);
if (req->r_abort_on_full &&
(ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
pool_full(osdc, req->r_t.target_oloc.pool)))
abort_request(req, -ENOSPC);
}
}
out:
dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier);
if (osdc->abort_on_full &&
(ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc)))
for_each_request(osdc, abort_on_full_fn, &victims);
}
static void check_pool_dne(struct ceph_osd_request *req)
......@@ -3541,8 +3578,6 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
up_read(&osdc->lock);
__complete_request(req);
complete_all(&req->r_completion);
ceph_osdc_put_request(req);
return;
fail_request:
......@@ -4927,7 +4962,10 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
if (ret)
goto out_put_req;
osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
ret = osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
if (ret)
goto out_put_req;
if (req_page)
osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
0, false, false);
......@@ -4996,6 +5034,10 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
if (!osdc->notify_wq)
goto out_msgpool_reply;
osdc->completion_wq = create_singlethread_workqueue("ceph-completion");
if (!osdc->completion_wq)
goto out_notify_wq;
schedule_delayed_work(&osdc->timeout_work,
osdc->client->options->osd_keepalive_timeout);
schedule_delayed_work(&osdc->osds_timeout_work,
......@@ -5003,6 +5045,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
return 0;
out_notify_wq:
destroy_workqueue(osdc->notify_wq);
out_msgpool_reply:
ceph_msgpool_destroy(&osdc->msgpool_op_reply);
out_msgpool:
......@@ -5017,7 +5061,7 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
void ceph_osdc_stop(struct ceph_osd_client *osdc)
{
flush_workqueue(osdc->notify_wq);
destroy_workqueue(osdc->completion_wq);
destroy_workqueue(osdc->notify_wq);
cancel_delayed_work_sync(&osdc->timeout_work);
cancel_delayed_work_sync(&osdc->osds_timeout_work);
......
......@@ -2146,10 +2146,10 @@ bool ceph_osds_changed(const struct ceph_osds *old_acting,
* Should only be called with target_oid and target_oloc (as opposed to
* base_oid and base_oloc), since tiering isn't taken into account.
*/
int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
const struct ceph_object_id *oid,
const struct ceph_object_locator *oloc,
struct ceph_pg *raw_pgid)
void __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
const struct ceph_object_id *oid,
const struct ceph_object_locator *oloc,
struct ceph_pg *raw_pgid)
{
WARN_ON(pi->id != oloc->pool);
......@@ -2165,11 +2165,8 @@ int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
int nsl = oloc->pool_ns->len;
size_t total = nsl + 1 + oid->name_len;
if (total > sizeof(stack_buf)) {
buf = kmalloc(total, GFP_NOIO);
if (!buf)
return -ENOMEM;
}
if (total > sizeof(stack_buf))
buf = kmalloc(total, GFP_NOIO | __GFP_NOFAIL);
memcpy(buf, oloc->pool_ns->str, nsl);
buf[nsl] = '\037';
memcpy(buf + nsl + 1, oid->name, oid->name_len);
......@@ -2181,7 +2178,6 @@ int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi,
oid->name, nsl, oloc->pool_ns->str,
raw_pgid->pool, raw_pgid->seed);
}
return 0;
}
int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
......@@ -2195,7 +2191,8 @@ int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
if (!pi)
return -ENOENT;
return __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid);
__ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid);
return 0;
}
EXPORT_SYMBOL(ceph_object_locator_to_pg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册