提交 59331c21 编写于 作者: L Linus Torvalds

Merge tag 'ceph-for-4.10-rc1' of git://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "A varied set of changes:

   - a large rework of cephx auth code to cope with CONFIG_VMAP_STACK
     (myself). Also fixed a deadlock caused by a bogus allocation on the
     writeback path and authorize reply verification.

   - a fix for long stalls during fsync (Jeff Layton). The client now
     has a way to force the MDS log flush, leading to ~100x speedups in
     some synthetic tests.

   - a new [no]require_active_mds mount option (Zheng Yan).

     On mount, we will now check whether any of the MDSes are available
     and bail rather than block if none are. This check can be avoided
     by specifying the "no" option.

   - a couple of MDS cap handling fixes and a few assorted patches
     throughout"

* tag 'ceph-for-4.10-rc1' of git://github.com/ceph/ceph-client: (32 commits)
  libceph: remove now unused finish_request() wrapper
  libceph: always signal completion when done
  ceph: avoid creating orphan object when checking pool permission
  ceph: properly set issue_seq for cap release
  ceph: add flags parameter to send_cap_msg
  ceph: update cap message struct version to 10
  ceph: define new argument structure for send_cap_msg
  ceph: move xattr initialzation before the encoding past the ceph_mds_caps
  ceph: fix minor typo in unsafe_request_wait
  ceph: record truncate size/seq for snap data writeback
  ceph: check availability of mds cluster on mount
  ceph: fix splice read for no Fc capability case
  ceph: try getting buffer capability for readahead/fadvise
  ceph: fix scheduler warning due to nested blocking
  ceph: fix printing wrong return variable in ceph_direct_read_write()
  crush: include mapper.h in mapper.c
  rbd: silence bogus -Wmaybe-uninitialized warning
  libceph: no need to drop con->mutex for ->get_authorizer()
  libceph: drop len argument of *verify_authorizer_reply()
  libceph: verify authorize reply on connect
  ...
......@@ -3756,7 +3756,7 @@ static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
struct rbd_device *rbd_dev = arg;
void *p = data;
void *const end = p + data_len;
u8 struct_v;
u8 struct_v = 0;
u32 len;
u32 notify_op;
int ret;
......
......@@ -315,7 +315,32 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
struct page **pages;
pgoff_t next_index;
int nr_pages = 0;
int ret;
int got = 0;
int ret = 0;
if (!current->journal_info) {
/* caller of readpages does not hold buffer and read caps
* (fadvise, madvise and readahead cases) */
int want = CEPH_CAP_FILE_CACHE;
ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, &got);
if (ret < 0) {
dout("start_read %p, error getting cap\n", inode);
} else if (!(got & want)) {
dout("start_read %p, no cache cap\n", inode);
ret = 0;
}
if (ret <= 0) {
if (got)
ceph_put_cap_refs(ci, got);
while (!list_empty(page_list)) {
page = list_entry(page_list->prev,
struct page, lru);
list_del(&page->lru);
put_page(page);
}
return ret;
}
}
off = (u64) page_offset(page);
......@@ -338,15 +363,18 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
CEPH_OSD_FLAG_READ, NULL,
ci->i_truncate_seq, ci->i_truncate_size,
false);
if (IS_ERR(req))
return PTR_ERR(req);
if (IS_ERR(req)) {
ret = PTR_ERR(req);
goto out;
}
/* build page vector */
nr_pages = calc_pages_for(0, len);
pages = kmalloc(sizeof(*pages) * nr_pages, GFP_KERNEL);
ret = -ENOMEM;
if (!pages)
goto out;
if (!pages) {
ret = -ENOMEM;
goto out_put;
}
for (i = 0; i < nr_pages; ++i) {
page = list_entry(page_list->prev, struct page, lru);
BUG_ON(PageLocked(page));
......@@ -378,6 +406,12 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
if (ret < 0)
goto out_pages;
ceph_osdc_put_request(req);
/* After adding locked pages to page cache, the inode holds cache cap.
* So we can drop our cap refs. */
if (got)
ceph_put_cap_refs(ci, got);
return nr_pages;
out_pages:
......@@ -386,8 +420,11 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
unlock_page(pages[i]);
}
ceph_put_page_vector(pages, nr_pages, false);
out:
out_put:
ceph_osdc_put_request(req);
out:
if (got)
ceph_put_cap_refs(ci, got);
return ret;
}
......@@ -424,7 +461,6 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
rc = start_read(inode, page_list, max);
if (rc < 0)
goto out;
BUG_ON(rc == 0);
}
out:
ceph_fscache_readpages_cancel(inode, page_list);
......@@ -438,7 +474,9 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
* only snap context we are allowed to write back.
*/
static struct ceph_snap_context *get_oldest_context(struct inode *inode,
loff_t *snap_size)
loff_t *snap_size,
u64 *truncate_size,
u32 *truncate_seq)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_snap_context *snapc = NULL;
......@@ -452,6 +490,10 @@ static struct ceph_snap_context *get_oldest_context(struct inode *inode,
snapc = ceph_get_snap_context(capsnap->context);
if (snap_size)
*snap_size = capsnap->size;
if (truncate_size)
*truncate_size = capsnap->truncate_size;
if (truncate_seq)
*truncate_seq = capsnap->truncate_seq;
break;
}
}
......@@ -459,6 +501,10 @@ static struct ceph_snap_context *get_oldest_context(struct inode *inode,
snapc = ceph_get_snap_context(ci->i_head_snapc);
dout(" head snapc %p has %d dirty pages\n",
snapc, ci->i_wrbuffer_ref_head);
if (truncate_size)
*truncate_size = capsnap->truncate_size;
if (truncate_seq)
*truncate_seq = capsnap->truncate_seq;
}
spin_unlock(&ci->i_ceph_lock);
return snapc;
......@@ -501,7 +547,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
dout("writepage %p page %p not dirty?\n", inode, page);
goto out;
}
oldest = get_oldest_context(inode, &snap_size);
oldest = get_oldest_context(inode, &snap_size,
&truncate_size, &truncate_seq);
if (snapc->seq > oldest->seq) {
dout("writepage %p page %p snapc %p not writeable - noop\n",
inode, page, snapc);
......@@ -512,12 +559,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
}
ceph_put_snap_context(oldest);
spin_lock(&ci->i_ceph_lock);
truncate_seq = ci->i_truncate_seq;
truncate_size = ci->i_truncate_size;
if (snap_size == -1)
snap_size = i_size_read(inode);
spin_unlock(&ci->i_ceph_lock);
/* is this a partial page at end of file? */
if (page_off >= snap_size) {
......@@ -764,7 +807,8 @@ static int ceph_writepages_start(struct address_space *mapping,
/* find oldest snap context with dirty data */
ceph_put_snap_context(snapc);
snap_size = -1;
snapc = get_oldest_context(inode, &snap_size);
snapc = get_oldest_context(inode, &snap_size,
&truncate_size, &truncate_seq);
if (!snapc) {
/* hmm, why does writepages get called when there
is no dirty data? */
......@@ -774,11 +818,7 @@ static int ceph_writepages_start(struct address_space *mapping,
dout(" oldest snapc is %p seq %lld (%d snaps)\n",
snapc, snapc->seq, snapc->num_snaps);
spin_lock(&ci->i_ceph_lock);
truncate_seq = ci->i_truncate_seq;
truncate_size = ci->i_truncate_size;
i_size = i_size_read(inode);
spin_unlock(&ci->i_ceph_lock);
if (last_snapc && snapc != last_snapc) {
/* if we switched to a newer snapc, restart our scan at the
......@@ -1124,7 +1164,8 @@ static int ceph_writepages_start(struct address_space *mapping,
static int context_is_writeable_or_written(struct inode *inode,
struct ceph_snap_context *snapc)
{
struct ceph_snap_context *oldest = get_oldest_context(inode, NULL);
struct ceph_snap_context *oldest = get_oldest_context(inode, NULL,
NULL, NULL);
int ret = !oldest || snapc->seq <= oldest->seq;
ceph_put_snap_context(oldest);
......@@ -1169,7 +1210,7 @@ static int ceph_update_writeable_page(struct file *file,
* this page is already dirty in another (older) snap
* context! is it writeable now?
*/
oldest = get_oldest_context(inode, NULL);
oldest = get_oldest_context(inode, NULL, NULL, NULL);
if (snapc->seq > oldest->seq) {
ceph_put_snap_context(oldest);
......@@ -1371,9 +1412,11 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got));
if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
ci->i_inline_version == CEPH_INLINE_NONE)
ci->i_inline_version == CEPH_INLINE_NONE) {
current->journal_info = vma->vm_file;
ret = filemap_fault(vma, vmf);
else
current->journal_info = NULL;
} else
ret = -EAGAIN;
dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
......@@ -1905,6 +1948,15 @@ int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
struct ceph_string *pool_ns;
int ret, flags;
if (ci->i_vino.snap != CEPH_NOSNAP) {
/*
* Pool permission check needs to write to the first object.
* But for snapshot, head of the first object may have alread
* been deleted. Skip check to avoid creating orphan object.
*/
return 0;
}
if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode),
NOPOOLPERM))
return 0;
......
......@@ -987,96 +987,127 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
__cap_delay_cancel(mdsc, ci);
}
struct cap_msg_args {
struct ceph_mds_session *session;
u64 ino, cid, follows;
u64 flush_tid, oldest_flush_tid, size, max_size;
u64 xattr_version;
struct ceph_buffer *xattr_buf;
struct timespec atime, mtime, ctime;
int op, caps, wanted, dirty;
u32 seq, issue_seq, mseq, time_warp_seq;
u32 flags;
kuid_t uid;
kgid_t gid;
umode_t mode;
bool inline_data;
};
/*
* Build and send a cap message to the given MDS.
*
* Caller should be holding s_mutex.
*/
static int send_cap_msg(struct ceph_mds_session *session,
u64 ino, u64 cid, int op,
int caps, int wanted, int dirty,
u32 seq, u64 flush_tid, u64 oldest_flush_tid,
u32 issue_seq, u32 mseq, u64 size, u64 max_size,
struct timespec *mtime, struct timespec *atime,
struct timespec *ctime, u32 time_warp_seq,
kuid_t uid, kgid_t gid, umode_t mode,
u64 xattr_version,
struct ceph_buffer *xattrs_buf,
u64 follows, bool inline_data)
static int send_cap_msg(struct cap_msg_args *arg)
{
struct ceph_mds_caps *fc;
struct ceph_msg *msg;
void *p;
size_t extra_len;
struct timespec zerotime = {0};
dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
" seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu"
" xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(op),
cid, ino, ceph_cap_string(caps), ceph_cap_string(wanted),
ceph_cap_string(dirty),
seq, issue_seq, flush_tid, oldest_flush_tid,
mseq, follows, size, max_size,
xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
" xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(arg->op),
arg->cid, arg->ino, ceph_cap_string(arg->caps),
ceph_cap_string(arg->wanted), ceph_cap_string(arg->dirty),
arg->seq, arg->issue_seq, arg->flush_tid, arg->oldest_flush_tid,
arg->mseq, arg->follows, arg->size, arg->max_size,
arg->xattr_version,
arg->xattr_buf ? (int)arg->xattr_buf->vec.iov_len : 0);
/* flock buffer size + inline version + inline data size +
* osd_epoch_barrier + oldest_flush_tid */
extra_len = 4 + 8 + 4 + 4 + 8;
extra_len = 4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4;
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len,
GFP_NOFS, false);
if (!msg)
return -ENOMEM;
msg->hdr.version = cpu_to_le16(6);
msg->hdr.tid = cpu_to_le64(flush_tid);
msg->hdr.version = cpu_to_le16(10);
msg->hdr.tid = cpu_to_le64(arg->flush_tid);
fc = msg->front.iov_base;
memset(fc, 0, sizeof(*fc));
fc->cap_id = cpu_to_le64(cid);
fc->op = cpu_to_le32(op);
fc->seq = cpu_to_le32(seq);
fc->issue_seq = cpu_to_le32(issue_seq);
fc->migrate_seq = cpu_to_le32(mseq);
fc->caps = cpu_to_le32(caps);
fc->wanted = cpu_to_le32(wanted);
fc->dirty = cpu_to_le32(dirty);
fc->ino = cpu_to_le64(ino);
fc->snap_follows = cpu_to_le64(follows);
fc->size = cpu_to_le64(size);
fc->max_size = cpu_to_le64(max_size);
if (mtime)
ceph_encode_timespec(&fc->mtime, mtime);
if (atime)
ceph_encode_timespec(&fc->atime, atime);
if (ctime)
ceph_encode_timespec(&fc->ctime, ctime);
fc->time_warp_seq = cpu_to_le32(time_warp_seq);
fc->uid = cpu_to_le32(from_kuid(&init_user_ns, uid));
fc->gid = cpu_to_le32(from_kgid(&init_user_ns, gid));
fc->mode = cpu_to_le32(mode);
fc->cap_id = cpu_to_le64(arg->cid);
fc->op = cpu_to_le32(arg->op);
fc->seq = cpu_to_le32(arg->seq);
fc->issue_seq = cpu_to_le32(arg->issue_seq);
fc->migrate_seq = cpu_to_le32(arg->mseq);
fc->caps = cpu_to_le32(arg->caps);
fc->wanted = cpu_to_le32(arg->wanted);
fc->dirty = cpu_to_le32(arg->dirty);
fc->ino = cpu_to_le64(arg->ino);
fc->snap_follows = cpu_to_le64(arg->follows);
fc->size = cpu_to_le64(arg->size);
fc->max_size = cpu_to_le64(arg->max_size);
ceph_encode_timespec(&fc->mtime, &arg->mtime);
ceph_encode_timespec(&fc->atime, &arg->atime);
ceph_encode_timespec(&fc->ctime, &arg->ctime);
fc->time_warp_seq = cpu_to_le32(arg->time_warp_seq);
fc->uid = cpu_to_le32(from_kuid(&init_user_ns, arg->uid));
fc->gid = cpu_to_le32(from_kgid(&init_user_ns, arg->gid));
fc->mode = cpu_to_le32(arg->mode);
fc->xattr_version = cpu_to_le64(arg->xattr_version);
if (arg->xattr_buf) {
msg->middle = ceph_buffer_get(arg->xattr_buf);
fc->xattr_len = cpu_to_le32(arg->xattr_buf->vec.iov_len);
msg->hdr.middle_len = cpu_to_le32(arg->xattr_buf->vec.iov_len);
}
p = fc + 1;
/* flock buffer size */
/* flock buffer size (version 2) */
ceph_encode_32(&p, 0);
/* inline version */
ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE);
/* inline version (version 4) */
ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE);
/* inline data size */
ceph_encode_32(&p, 0);
/* osd_epoch_barrier */
/* osd_epoch_barrier (version 5) */
ceph_encode_32(&p, 0);
/* oldest_flush_tid */
ceph_encode_64(&p, oldest_flush_tid);
/* oldest_flush_tid (version 6) */
ceph_encode_64(&p, arg->oldest_flush_tid);
fc->xattr_version = cpu_to_le64(xattr_version);
if (xattrs_buf) {
msg->middle = ceph_buffer_get(xattrs_buf);
fc->xattr_len = cpu_to_le32(xattrs_buf->vec.iov_len);
msg->hdr.middle_len = cpu_to_le32(xattrs_buf->vec.iov_len);
}
/*
* caller_uid/caller_gid (version 7)
*
* Currently, we don't properly track which caller dirtied the caps
* last, and force a flush of them when there is a conflict. For now,
* just set this to 0:0, to emulate how the MDS has worked up to now.
*/
ceph_encode_32(&p, 0);
ceph_encode_32(&p, 0);
/* pool namespace (version 8) (mds always ignores this) */
ceph_encode_32(&p, 0);
ceph_con_send(&session->s_con, msg);
/*
* btime and change_attr (version 9)
*
* We just zero these out for now, as the MDS ignores them unless
* the requisite feature flags are set (which we don't do yet).
*/
ceph_encode_timespec(p, &zerotime);
p += sizeof(struct ceph_timespec);
ceph_encode_64(&p, 0);
/* Advisory flags (version 10) */
ceph_encode_32(&p, arg->flags);
ceph_con_send(&arg->session->s_con, msg);
return 0;
}
......@@ -1115,27 +1146,17 @@ void ceph_queue_caps_release(struct inode *inode)
* caller should hold snap_rwsem (read), s_mutex.
*/
static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
int op, int used, int want, int retain, int flushing,
u64 flush_tid, u64 oldest_flush_tid)
int op, bool sync, int used, int want, int retain,
int flushing, u64 flush_tid, u64 oldest_flush_tid)
__releases(cap->ci->i_ceph_lock)
{
struct ceph_inode_info *ci = cap->ci;
struct inode *inode = &ci->vfs_inode;
u64 cap_id = cap->cap_id;
int held, revoking, dropping, keep;
u64 follows, size, max_size;
u32 seq, issue_seq, mseq, time_warp_seq;
struct timespec mtime, atime, ctime;
struct cap_msg_args arg;
int held, revoking, dropping;
int wake = 0;
umode_t mode;
kuid_t uid;
kgid_t gid;
struct ceph_mds_session *session;
u64 xattr_version = 0;
struct ceph_buffer *xattr_blob = NULL;
int delayed = 0;
int ret;
bool inline_data;
held = cap->issued | cap->implemented;
revoking = cap->implemented & ~cap->issued;
......@@ -1148,7 +1169,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
ceph_cap_string(revoking));
BUG_ON((retain & CEPH_CAP_PIN) == 0);
session = cap->session;
arg.session = cap->session;
/* don't release wanted unless we've waited a bit. */
if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 &&
......@@ -1177,40 +1198,51 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
cap->implemented &= cap->issued | used;
cap->mds_wanted = want;
follows = flushing ? ci->i_head_snapc->seq : 0;
keep = cap->implemented;
seq = cap->seq;
issue_seq = cap->issue_seq;
mseq = cap->mseq;
size = inode->i_size;
ci->i_reported_size = size;
max_size = ci->i_wanted_max_size;
ci->i_requested_max_size = max_size;
mtime = inode->i_mtime;
atime = inode->i_atime;
ctime = inode->i_ctime;
time_warp_seq = ci->i_time_warp_seq;
uid = inode->i_uid;
gid = inode->i_gid;
mode = inode->i_mode;
arg.ino = ceph_vino(inode).ino;
arg.cid = cap->cap_id;
arg.follows = flushing ? ci->i_head_snapc->seq : 0;
arg.flush_tid = flush_tid;
arg.oldest_flush_tid = oldest_flush_tid;
arg.size = inode->i_size;
ci->i_reported_size = arg.size;
arg.max_size = ci->i_wanted_max_size;
ci->i_requested_max_size = arg.max_size;
if (flushing & CEPH_CAP_XATTR_EXCL) {
__ceph_build_xattrs_blob(ci);
xattr_blob = ci->i_xattrs.blob;
xattr_version = ci->i_xattrs.version;
arg.xattr_version = ci->i_xattrs.version;
arg.xattr_buf = ci->i_xattrs.blob;
} else {
arg.xattr_buf = NULL;
}
inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
arg.mtime = inode->i_mtime;
arg.atime = inode->i_atime;
arg.ctime = inode->i_ctime;
arg.op = op;
arg.caps = cap->implemented;
arg.wanted = want;
arg.dirty = flushing;
arg.seq = cap->seq;
arg.issue_seq = cap->issue_seq;
arg.mseq = cap->mseq;
arg.time_warp_seq = ci->i_time_warp_seq;
arg.uid = inode->i_uid;
arg.gid = inode->i_gid;
arg.mode = inode->i_mode;
arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
arg.flags = 0;
if (sync)
arg.flags |= CEPH_CLIENT_CAPS_SYNC;
spin_unlock(&ci->i_ceph_lock);
ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
op, keep, want, flushing, seq,
flush_tid, oldest_flush_tid, issue_seq, mseq,
size, max_size, &mtime, &atime, &ctime, time_warp_seq,
uid, gid, mode, xattr_version, xattr_blob,
follows, inline_data);
ret = send_cap_msg(&arg);
if (ret < 0) {
dout("error sending cap msg, must requeue %p\n", inode);
delayed = 1;
......@@ -1227,15 +1259,42 @@ static inline int __send_flush_snap(struct inode *inode,
struct ceph_cap_snap *capsnap,
u32 mseq, u64 oldest_flush_tid)
{
return send_cap_msg(session, ceph_vino(inode).ino, 0,
CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0,
capsnap->dirty, 0, capsnap->cap_flush.tid,
oldest_flush_tid, 0, mseq, capsnap->size, 0,
&capsnap->mtime, &capsnap->atime,
&capsnap->ctime, capsnap->time_warp_seq,
capsnap->uid, capsnap->gid, capsnap->mode,
capsnap->xattr_version, capsnap->xattr_blob,
capsnap->follows, capsnap->inline_data);
struct cap_msg_args arg;
arg.session = session;
arg.ino = ceph_vino(inode).ino;
arg.cid = 0;
arg.follows = capsnap->follows;
arg.flush_tid = capsnap->cap_flush.tid;
arg.oldest_flush_tid = oldest_flush_tid;
arg.size = capsnap->size;
arg.max_size = 0;
arg.xattr_version = capsnap->xattr_version;
arg.xattr_buf = capsnap->xattr_blob;
arg.atime = capsnap->atime;
arg.mtime = capsnap->mtime;
arg.ctime = capsnap->ctime;
arg.op = CEPH_CAP_OP_FLUSHSNAP;
arg.caps = capsnap->issued;
arg.wanted = 0;
arg.dirty = capsnap->dirty;
arg.seq = 0;
arg.issue_seq = 0;
arg.mseq = mseq;
arg.time_warp_seq = capsnap->time_warp_seq;
arg.uid = capsnap->uid;
arg.gid = capsnap->gid;
arg.mode = capsnap->mode;
arg.inline_data = capsnap->inline_data;
arg.flags = 0;
return send_cap_msg(&arg);
}
/*
......@@ -1858,9 +1917,9 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
sent++;
/* __send_cap drops i_ceph_lock */
delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used,
want, retain, flushing,
flush_tid, oldest_flush_tid);
delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, false,
cap_used, want, retain, flushing,
flush_tid, oldest_flush_tid);
goto retry; /* retake i_ceph_lock and restart our cap scan. */
}
......@@ -1924,9 +1983,9 @@ static int try_flush_caps(struct inode *inode, u64 *ptid)
&flush_tid, &oldest_flush_tid);
/* __send_cap drops i_ceph_lock */
delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want,
(cap->issued | cap->implemented),
flushing, flush_tid, oldest_flush_tid);
delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, true,
used, want, (cap->issued | cap->implemented),
flushing, flush_tid, oldest_flush_tid);
if (delayed) {
spin_lock(&ci->i_ceph_lock);
......@@ -1996,7 +2055,7 @@ static int unsafe_request_wait(struct inode *inode)
}
spin_unlock(&ci->i_unsafe_lock);
dout("unsafe_requeset_wait %p wait on tid %llu %llu\n",
dout("unsafe_request_wait %p wait on tid %llu %llu\n",
inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL);
if (req1) {
ret = !wait_for_completion_timeout(&req1->r_safe_completion,
......@@ -2119,7 +2178,7 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
inode, cap, cf->tid, ceph_cap_string(cf->caps));
ci->i_ceph_flags |= CEPH_I_NODELAY;
ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
__ceph_caps_used(ci),
false, __ceph_caps_used(ci),
__ceph_caps_wanted(ci),
cap->issued | cap->implemented,
cf->caps, cf->tid, oldest_flush_tid);
......@@ -2479,6 +2538,27 @@ static void check_max_size(struct inode *inode, loff_t endoff)
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
}
int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got)
{
int ret, err = 0;
BUG_ON(need & ~CEPH_CAP_FILE_RD);
BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
ret = ceph_pool_perm_check(ci, need);
if (ret < 0)
return ret;
ret = try_get_cap_refs(ci, need, want, 0, true, got, &err);
if (ret) {
if (err == -EAGAIN) {
ret = 0;
} else if (err < 0) {
ret = err;
}
}
return ret;
}
/*
* Wait for caps, and take cap references. If we can't get a WR cap
* due to a small max_size, make sure we check_max_size (and possibly
......@@ -2507,9 +2587,15 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
if (err < 0)
ret = err;
} else {
ret = wait_event_interruptible(ci->i_cap_wq,
try_get_cap_refs(ci, need, want, endoff,
true, &_got, &err));
DEFINE_WAIT_FUNC(wait, woken_wake_function);
add_wait_queue(&ci->i_cap_wq, &wait);
while (!try_get_cap_refs(ci, need, want, endoff,
true, &_got, &err))
wait_woken(&wait, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
remove_wait_queue(&ci->i_cap_wq, &wait);
if (err == -EAGAIN)
continue;
if (err < 0)
......@@ -3570,6 +3656,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
cap->cap_id = le64_to_cpu(h->cap_id);
cap->mseq = mseq;
cap->seq = seq;
cap->issue_seq = seq;
spin_lock(&session->s_cap_lock);
list_add_tail(&cap->session_caps,
&session->s_cap_releases);
......
......@@ -454,71 +454,60 @@ enum {
* only return a short read to the caller if we hit EOF.
*/
static int striped_read(struct inode *inode,
u64 off, u64 len,
u64 pos, u64 len,
struct page **pages, int num_pages,
int *checkeof)
int page_align, int *checkeof)
{
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_inode_info *ci = ceph_inode(inode);
u64 pos, this_len, left;
u64 this_len;
loff_t i_size;
int page_align, pages_left;
int read, ret;
struct page **page_pos;
int page_idx;
int ret, read = 0;
bool hit_stripe, was_short;
/*
* we may need to do multiple reads. not atomic, unfortunately.
*/
pos = off;
left = len;
page_pos = pages;
pages_left = num_pages;
read = 0;
more:
page_align = pos & ~PAGE_MASK;
this_len = left;
this_len = len;
page_idx = (page_align + read) >> PAGE_SHIFT;
ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
&ci->i_layout, pos, &this_len,
ci->i_truncate_seq,
ci->i_truncate_size,
page_pos, pages_left, page_align);
ci->i_truncate_seq, ci->i_truncate_size,
pages + page_idx, num_pages - page_idx,
((page_align + read) & ~PAGE_MASK));
if (ret == -ENOENT)
ret = 0;
hit_stripe = this_len < left;
hit_stripe = this_len < len;
was_short = ret >= 0 && ret < this_len;
dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read,
dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read,
ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
i_size = i_size_read(inode);
if (ret >= 0) {
int didpages;
if (was_short && (pos + ret < i_size)) {
int zlen = min(this_len - ret, i_size - pos - ret);
int zoff = (off & ~PAGE_MASK) + read + ret;
int zoff = page_align + read + ret;
dout(" zero gap %llu to %llu\n",
pos + ret, pos + ret + zlen);
pos + ret, pos + ret + zlen);
ceph_zero_page_vector_range(zoff, zlen, pages);
ret += zlen;
}
didpages = (page_align + ret) >> PAGE_SHIFT;
read += ret;
pos += ret;
read = pos - off;
left -= ret;
page_pos += didpages;
pages_left -= didpages;
len -= ret;
/* hit stripe and need continue*/
if (left && hit_stripe && pos < i_size)
if (len && hit_stripe && pos < i_size)
goto more;
}
if (read > 0) {
ret = read;
/* did we bounce off eof? */
if (pos + left > i_size)
if (pos + len > i_size)
*checkeof = CHECK_EOF;
}
......@@ -532,15 +521,16 @@ static int striped_read(struct inode *inode,
*
* If the read spans object boundary, just do multiple reads.
*/
static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
int *checkeof)
static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
int *checkeof)
{
struct file *file = iocb->ki_filp;
struct inode *inode = file_inode(file);
struct page **pages;
u64 off = iocb->ki_pos;
int num_pages, ret;
size_t len = iov_iter_count(i);
int num_pages;
ssize_t ret;
size_t len = iov_iter_count(to);
dout("sync_read on file %p %llu~%u %s\n", file, off,
(unsigned)len,
......@@ -559,35 +549,56 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
if (ret < 0)
return ret;
num_pages = calc_pages_for(off, len);
pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
if (IS_ERR(pages))
return PTR_ERR(pages);
ret = striped_read(inode, off, len, pages,
num_pages, checkeof);
if (ret > 0) {
int l, k = 0;
size_t left = ret;
while (left) {
size_t page_off = off & ~PAGE_MASK;
size_t copy = min_t(size_t, left,
PAGE_SIZE - page_off);
l = copy_page_to_iter(pages[k++], page_off, copy, i);
off += l;
left -= l;
if (l < copy)
break;
if (unlikely(to->type & ITER_PIPE)) {
size_t page_off;
ret = iov_iter_get_pages_alloc(to, &pages, len,
&page_off);
if (ret <= 0)
return -ENOMEM;
num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE);
ret = striped_read(inode, off, ret, pages, num_pages,
page_off, checkeof);
if (ret > 0) {
iov_iter_advance(to, ret);
off += ret;
} else {
iov_iter_advance(to, 0);
}
ceph_put_page_vector(pages, num_pages, false);
} else {
num_pages = calc_pages_for(off, len);
pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
if (IS_ERR(pages))
return PTR_ERR(pages);
ret = striped_read(inode, off, len, pages, num_pages,
(off & ~PAGE_MASK), checkeof);
if (ret > 0) {
int l, k = 0;
size_t left = ret;
while (left) {
size_t page_off = off & ~PAGE_MASK;
size_t copy = min_t(size_t, left,
PAGE_SIZE - page_off);
l = copy_page_to_iter(pages[k++], page_off,
copy, to);
off += l;
left -= l;
if (l < copy)
break;
}
}
ceph_release_page_vector(pages, num_pages);
}
ceph_release_page_vector(pages, num_pages);
if (off > iocb->ki_pos) {
ret = off - iocb->ki_pos;
iocb->ki_pos = off;
}
dout("sync_read result %d\n", ret);
dout("sync_read result %zd\n", ret);
return ret;
}
......@@ -849,7 +860,7 @@ void ceph_sync_write_wait(struct inode *inode)
dout("sync_write_wait on tid %llu (until %llu)\n",
req->r_tid, last_tid);
wait_for_completion(&req->r_safe_completion);
wait_for_completion(&req->r_done_completion);
ceph_osdc_put_request(req);
spin_lock(&ci->i_unsafe_lock);
......@@ -902,7 +913,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
pos >> PAGE_SHIFT,
(pos + count) >> PAGE_SHIFT);
if (ret2 < 0)
dout("invalidate_inode_pages2_range returned %d\n", ret);
dout("invalidate_inode_pages2_range returned %d\n", ret2);
flags = CEPH_OSD_FLAG_ORDERSNAP |
CEPH_OSD_FLAG_ONDISK |
......@@ -1245,8 +1256,9 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
ceph_cap_string(got));
current->journal_info = filp;
ret = generic_file_read_iter(iocb, to);
current->journal_info = NULL;
}
dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
......@@ -1766,6 +1778,7 @@ const struct file_operations ceph_file_fops = {
.fsync = ceph_fsync,
.lock = ceph_lock,
.flock = ceph_flock,
.splice_read = generic_file_splice_read,
.splice_write = iter_file_splice_write,
.unlocked_ioctl = ceph_ioctl,
.compat_ioctl = ceph_ioctl,
......
......@@ -2100,17 +2100,26 @@ static int __do_request(struct ceph_mds_client *mdsc,
err = -EIO;
goto finish;
}
if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
if (mdsc->mdsmap_err) {
err = mdsc->mdsmap_err;
dout("do_request mdsmap err %d\n", err);
goto finish;
}
if (!(mdsc->fsc->mount_options->flags &
CEPH_MOUNT_OPT_MOUNTWAIT) &&
!ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
err = -ENOENT;
pr_info("probably no mds server is up\n");
goto finish;
}
}
put_request_session(req);
mds = __choose_mds(mdsc, req);
if (mds < 0 ||
ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
if (mdsc->mdsmap_err) {
err = mdsc->mdsmap_err;
dout("do_request mdsmap err %d\n", err);
goto finish;
}
dout("do_request no mds or not active, waiting for map\n");
list_add(&req->r_wait, &mdsc->waiting_for_map);
goto out;
......@@ -3943,13 +3952,13 @@ static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
}
static int verify_authorizer_reply(struct ceph_connection *con, int len)
static int verify_authorizer_reply(struct ceph_connection *con)
{
struct ceph_mds_session *s = con->private;
struct ceph_mds_client *mdsc = s->s_mdsc;
struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer, len);
return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer);
}
static int invalidate_authorizer(struct ceph_connection *con)
......
......@@ -42,6 +42,60 @@ int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m)
return i;
}
#define __decode_and_drop_type(p, end, type, bad) \
do { \
if (*p + sizeof(type) > end) \
goto bad; \
*p += sizeof(type); \
} while (0)
#define __decode_and_drop_set(p, end, type, bad) \
do { \
u32 n; \
size_t need; \
ceph_decode_32_safe(p, end, n, bad); \
need = sizeof(type) * n; \
ceph_decode_need(p, end, need, bad); \
*p += need; \
} while (0)
#define __decode_and_drop_map(p, end, ktype, vtype, bad) \
do { \
u32 n; \
size_t need; \
ceph_decode_32_safe(p, end, n, bad); \
need = (sizeof(ktype) + sizeof(vtype)) * n; \
ceph_decode_need(p, end, need, bad); \
*p += need; \
} while (0)
static int __decode_and_drop_compat_set(void **p, void* end)
{
int i;
/* compat, ro_compat, incompat*/
for (i = 0; i < 3; i++) {
u32 n;
ceph_decode_need(p, end, sizeof(u64) + sizeof(u32), bad);
/* mask */
*p += sizeof(u64);
/* names (map<u64, string>) */
n = ceph_decode_32(p);
while (n-- > 0) {
u32 len;
ceph_decode_need(p, end, sizeof(u64) + sizeof(u32),
bad);
*p += sizeof(u64);
len = ceph_decode_32(p);
ceph_decode_need(p, end, len, bad);
*p += len;
}
}
return 0;
bad:
return -1;
}
/*
* Decode an MDS map
*
......@@ -55,6 +109,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
int i, j, n;
int err = -EINVAL;
u8 mdsmap_v, mdsmap_cv;
u16 mdsmap_ev;
m = kzalloc(sizeof(*m), GFP_NOFS);
if (m == NULL)
......@@ -83,7 +138,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
m->m_info = kcalloc(m->m_max_mds, sizeof(*m->m_info), GFP_NOFS);
if (m->m_info == NULL)
goto badmem;
goto nomem;
/* pick out active nodes from mds_info (state > 0) */
n = ceph_decode_32(p);
......@@ -166,7 +221,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
info->export_targets = kcalloc(num_export_targets,
sizeof(u32), GFP_NOFS);
if (info->export_targets == NULL)
goto badmem;
goto nomem;
for (j = 0; j < num_export_targets; j++)
info->export_targets[j] =
ceph_decode_32(&pexport_targets);
......@@ -180,24 +235,104 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
m->m_num_data_pg_pools = n;
m->m_data_pg_pools = kcalloc(n, sizeof(u64), GFP_NOFS);
if (!m->m_data_pg_pools)
goto badmem;
goto nomem;
ceph_decode_need(p, end, sizeof(u64)*(n+1), bad);
for (i = 0; i < n; i++)
m->m_data_pg_pools[i] = ceph_decode_64(p);
m->m_cas_pg_pool = ceph_decode_64(p);
m->m_enabled = m->m_epoch > 1;
mdsmap_ev = 1;
if (mdsmap_v >= 2) {
ceph_decode_16_safe(p, end, mdsmap_ev, bad_ext);
}
if (mdsmap_ev >= 3) {
if (__decode_and_drop_compat_set(p, end) < 0)
goto bad_ext;
}
/* metadata_pool */
if (mdsmap_ev < 5) {
__decode_and_drop_type(p, end, u32, bad_ext);
} else {
__decode_and_drop_type(p, end, u64, bad_ext);
}
/* ok, we don't care about the rest. */
/* created + modified + tableserver */
__decode_and_drop_type(p, end, struct ceph_timespec, bad_ext);
__decode_and_drop_type(p, end, struct ceph_timespec, bad_ext);
__decode_and_drop_type(p, end, u32, bad_ext);
/* in */
{
int num_laggy = 0;
ceph_decode_32_safe(p, end, n, bad_ext);
ceph_decode_need(p, end, sizeof(u32) * n, bad_ext);
for (i = 0; i < n; i++) {
s32 mds = ceph_decode_32(p);
if (mds >= 0 && mds < m->m_max_mds) {
if (m->m_info[mds].laggy)
num_laggy++;
}
}
m->m_num_laggy = num_laggy;
}
/* inc */
__decode_and_drop_map(p, end, u32, u32, bad_ext);
/* up */
__decode_and_drop_map(p, end, u32, u64, bad_ext);
/* failed */
__decode_and_drop_set(p, end, u32, bad_ext);
/* stopped */
__decode_and_drop_set(p, end, u32, bad_ext);
if (mdsmap_ev >= 4) {
/* last_failure_osd_epoch */
__decode_and_drop_type(p, end, u32, bad_ext);
}
if (mdsmap_ev >= 6) {
/* ever_allowed_snaps */
__decode_and_drop_type(p, end, u8, bad_ext);
/* explicitly_allowed_snaps */
__decode_and_drop_type(p, end, u8, bad_ext);
}
if (mdsmap_ev >= 7) {
/* inline_data_enabled */
__decode_and_drop_type(p, end, u8, bad_ext);
}
if (mdsmap_ev >= 8) {
u32 name_len;
/* enabled */
ceph_decode_8_safe(p, end, m->m_enabled, bad_ext);
ceph_decode_32_safe(p, end, name_len, bad_ext);
ceph_decode_need(p, end, name_len, bad_ext);
*p += name_len;
}
/* damaged */
if (mdsmap_ev >= 9) {
size_t need;
ceph_decode_32_safe(p, end, n, bad_ext);
need = sizeof(u32) * n;
ceph_decode_need(p, end, need, bad_ext);
*p += need;
m->m_damaged = n > 0;
} else {
m->m_damaged = false;
}
bad_ext:
*p = end;
dout("mdsmap_decode success epoch %u\n", m->m_epoch);
return m;
badmem:
nomem:
err = -ENOMEM;
goto out_err;
bad:
pr_err("corrupt mdsmap\n");
print_hex_dump(KERN_DEBUG, "mdsmap: ",
DUMP_PREFIX_OFFSET, 16, 1,
start, end - start, true);
out_err:
ceph_mdsmap_destroy(m);
return ERR_PTR(err);
}
......@@ -212,3 +347,19 @@ void ceph_mdsmap_destroy(struct ceph_mdsmap *m)
kfree(m->m_data_pg_pools);
kfree(m);
}
bool ceph_mdsmap_is_cluster_available(struct ceph_mdsmap *m)
{
int i, nr_active = 0;
if (!m->m_enabled)
return false;
if (m->m_damaged)
return false;
if (m->m_num_laggy > 0)
return false;
for (i = 0; i < m->m_max_mds; i++) {
if (m->m_info[i].state == CEPH_MDS_STATE_ACTIVE)
nr_active++;
}
return nr_active > 0;
}
......@@ -593,6 +593,8 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
capsnap->atime = inode->i_atime;
capsnap->ctime = inode->i_ctime;
capsnap->time_warp_seq = ci->i_time_warp_seq;
capsnap->truncate_size = ci->i_truncate_size;
capsnap->truncate_seq = ci->i_truncate_seq;
if (capsnap->dirty_pages) {
dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu "
"still has %d dirty pages\n", inode, capsnap,
......
......@@ -137,6 +137,8 @@ enum {
Opt_nofscache,
Opt_poolperm,
Opt_nopoolperm,
Opt_require_active_mds,
Opt_norequire_active_mds,
#ifdef CONFIG_CEPH_FS_POSIX_ACL
Opt_acl,
#endif
......@@ -171,6 +173,8 @@ static match_table_t fsopt_tokens = {
{Opt_nofscache, "nofsc"},
{Opt_poolperm, "poolperm"},
{Opt_nopoolperm, "nopoolperm"},
{Opt_require_active_mds, "require_active_mds"},
{Opt_norequire_active_mds, "norequire_active_mds"},
#ifdef CONFIG_CEPH_FS_POSIX_ACL
{Opt_acl, "acl"},
#endif
......@@ -287,6 +291,12 @@ static int parse_fsopt_token(char *c, void *private)
case Opt_nopoolperm:
fsopt->flags |= CEPH_MOUNT_OPT_NOPOOLPERM;
break;
case Opt_require_active_mds:
fsopt->flags &= ~CEPH_MOUNT_OPT_MOUNTWAIT;
break;
case Opt_norequire_active_mds:
fsopt->flags |= CEPH_MOUNT_OPT_MOUNTWAIT;
break;
#ifdef CONFIG_CEPH_FS_POSIX_ACL
case Opt_acl:
fsopt->sb_flags |= MS_POSIXACL;
......
......@@ -36,6 +36,7 @@
#define CEPH_MOUNT_OPT_DCACHE (1<<9) /* use dcache for readdir etc */
#define CEPH_MOUNT_OPT_FSCACHE (1<<10) /* use fscache */
#define CEPH_MOUNT_OPT_NOPOOLPERM (1<<11) /* no pool permission check */
#define CEPH_MOUNT_OPT_MOUNTWAIT (1<<12) /* mount waits if no mds is up */
#define CEPH_MOUNT_OPT_DEFAULT CEPH_MOUNT_OPT_DCACHE
......@@ -180,6 +181,8 @@ struct ceph_cap_snap {
u64 size;
struct timespec mtime, atime, ctime;
u64 time_warp_seq;
u64 truncate_size;
u32 truncate_seq;
int writing; /* a sync write is still in progress */
int dirty_pages; /* dirty pages awaiting writeback */
bool inline_data;
......@@ -905,6 +908,8 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn,
extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
loff_t endoff, int *got, struct page **pinned_page);
extern int ceph_try_get_caps(struct ceph_inode_info *ci,
int need, int want, int *got);
/* for counting open files by mode */
extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode);
......
......@@ -64,7 +64,7 @@ struct ceph_auth_client_ops {
int (*update_authorizer)(struct ceph_auth_client *ac, int peer_type,
struct ceph_auth_handshake *auth);
int (*verify_authorizer_reply)(struct ceph_auth_client *ac,
struct ceph_authorizer *a, size_t len);
struct ceph_authorizer *a);
void (*invalidate_authorizer)(struct ceph_auth_client *ac,
int peer_type);
......@@ -118,8 +118,7 @@ extern int ceph_auth_update_authorizer(struct ceph_auth_client *ac,
int peer_type,
struct ceph_auth_handshake *a);
extern int ceph_auth_verify_authorizer_reply(struct ceph_auth_client *ac,
struct ceph_authorizer *a,
size_t len);
struct ceph_authorizer *a);
extern void ceph_auth_invalidate_authorizer(struct ceph_auth_client *ac,
int peer_type);
......
......@@ -653,6 +653,9 @@ enum {
extern const char *ceph_cap_op_name(int op);
/* flags field in client cap messages (version >= 10) */
#define CEPH_CLIENT_CAPS_SYNC (0x1)
/*
* caps message, used for capability callbacks, acks, requests, etc.
*/
......
......@@ -31,6 +31,10 @@ struct ceph_mdsmap {
int m_num_data_pg_pools;
u64 *m_data_pg_pools;
u64 m_cas_pg_pool;
bool m_enabled;
bool m_damaged;
int m_num_laggy;
};
static inline struct ceph_entity_addr *
......@@ -59,5 +63,6 @@ static inline bool ceph_mdsmap_is_laggy(struct ceph_mdsmap *m, int w)
extern int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m);
extern struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end);
extern void ceph_mdsmap_destroy(struct ceph_mdsmap *m);
extern bool ceph_mdsmap_is_cluster_available(struct ceph_mdsmap *m);
#endif
......@@ -30,7 +30,7 @@ struct ceph_connection_operations {
struct ceph_auth_handshake *(*get_authorizer) (
struct ceph_connection *con,
int *proto, int force_new);
int (*verify_authorizer_reply) (struct ceph_connection *con, int len);
int (*verify_authorizer_reply) (struct ceph_connection *con);
int (*invalidate_authorizer)(struct ceph_connection *con);
/* there was some error on the socket (disconnect, whatever) */
......
......@@ -176,7 +176,7 @@ struct ceph_osd_request {
struct kref r_kref;
bool r_mempool;
struct completion r_completion;
struct completion r_safe_completion; /* fsync waiter */
struct completion r_done_completion; /* fsync waiter */
ceph_osdc_callback_t r_callback;
ceph_osdc_unsafe_callback_t r_unsafe_callback;
struct list_head r_unsafe_item;
......
......@@ -315,13 +315,13 @@ int ceph_auth_update_authorizer(struct ceph_auth_client *ac,
EXPORT_SYMBOL(ceph_auth_update_authorizer);
int ceph_auth_verify_authorizer_reply(struct ceph_auth_client *ac,
struct ceph_authorizer *a, size_t len)
struct ceph_authorizer *a)
{
int ret = 0;
mutex_lock(&ac->mutex);
if (ac->ops && ac->ops->verify_authorizer_reply)
ret = ac->ops->verify_authorizer_reply(ac, a, len);
ret = ac->ops->verify_authorizer_reply(ac, a);
mutex_unlock(&ac->mutex);
return ret;
}
......
......@@ -39,56 +39,58 @@ static int ceph_x_should_authenticate(struct ceph_auth_client *ac)
return need != 0;
}
static int ceph_x_encrypt_offset(void)
{
return sizeof(u32) + sizeof(struct ceph_x_encrypt_header);
}
static int ceph_x_encrypt_buflen(int ilen)
{
return sizeof(struct ceph_x_encrypt_header) + ilen + 16 +
sizeof(u32);
return ceph_x_encrypt_offset() + ilen + 16;
}
static int ceph_x_encrypt(struct ceph_crypto_key *secret,
void *ibuf, int ilen, void *obuf, size_t olen)
static int ceph_x_encrypt(struct ceph_crypto_key *secret, void *buf,
int buf_len, int plaintext_len)
{
struct ceph_x_encrypt_header head = {
.struct_v = 1,
.magic = cpu_to_le64(CEPHX_ENC_MAGIC)
};
size_t len = olen - sizeof(u32);
struct ceph_x_encrypt_header *hdr = buf + sizeof(u32);
int ciphertext_len;
int ret;
ret = ceph_encrypt2(secret, obuf + sizeof(u32), &len,
&head, sizeof(head), ibuf, ilen);
hdr->struct_v = 1;
hdr->magic = cpu_to_le64(CEPHX_ENC_MAGIC);
ret = ceph_crypt(secret, true, buf + sizeof(u32), buf_len - sizeof(u32),
plaintext_len + sizeof(struct ceph_x_encrypt_header),
&ciphertext_len);
if (ret)
return ret;
ceph_encode_32(&obuf, len);
return len + sizeof(u32);
ceph_encode_32(&buf, ciphertext_len);
return sizeof(u32) + ciphertext_len;
}
static int ceph_x_decrypt(struct ceph_crypto_key *secret,
void **p, void *end, void **obuf, size_t olen)
static int ceph_x_decrypt(struct ceph_crypto_key *secret, void **p, void *end)
{
struct ceph_x_encrypt_header head;
size_t head_len = sizeof(head);
int len, ret;
len = ceph_decode_32(p);
if (*p + len > end)
return -EINVAL;
struct ceph_x_encrypt_header *hdr = *p + sizeof(u32);
int ciphertext_len, plaintext_len;
int ret;
dout("ceph_x_decrypt len %d\n", len);
if (*obuf == NULL) {
*obuf = kmalloc(len, GFP_NOFS);
if (!*obuf)
return -ENOMEM;
olen = len;
}
ceph_decode_32_safe(p, end, ciphertext_len, e_inval);
ceph_decode_need(p, end, ciphertext_len, e_inval);
ret = ceph_decrypt2(secret, &head, &head_len, *obuf, &olen, *p, len);
ret = ceph_crypt(secret, false, *p, end - *p, ciphertext_len,
&plaintext_len);
if (ret)
return ret;
if (head.struct_v != 1 || le64_to_cpu(head.magic) != CEPHX_ENC_MAGIC)
if (hdr->struct_v != 1 || le64_to_cpu(hdr->magic) != CEPHX_ENC_MAGIC)
return -EPERM;
*p += len;
return olen;
*p += ciphertext_len;
return plaintext_len - sizeof(struct ceph_x_encrypt_header);
e_inval:
return -EINVAL;
}
/*
......@@ -143,13 +145,10 @@ static int process_one_ticket(struct ceph_auth_client *ac,
int type;
u8 tkt_struct_v, blob_struct_v;
struct ceph_x_ticket_handler *th;
void *dbuf = NULL;
void *dp, *dend;
int dlen;
char is_enc;
struct timespec validity;
struct ceph_crypto_key old_key;
void *ticket_buf = NULL;
void *tp, *tpend;
void **ptp;
struct ceph_crypto_key new_session_key;
......@@ -174,20 +173,17 @@ static int process_one_ticket(struct ceph_auth_client *ac,
}
/* blob for me */
dlen = ceph_x_decrypt(secret, p, end, &dbuf, 0);
if (dlen <= 0) {
ret = dlen;
dp = *p + ceph_x_encrypt_offset();
ret = ceph_x_decrypt(secret, p, end);
if (ret < 0)
goto out;
}
dout(" decrypted %d bytes\n", dlen);
dp = dbuf;
dend = dp + dlen;
dout(" decrypted %d bytes\n", ret);
dend = dp + ret;
tkt_struct_v = ceph_decode_8(&dp);
if (tkt_struct_v != 1)
goto bad;
memcpy(&old_key, &th->session_key, sizeof(old_key));
ret = ceph_crypto_key_decode(&new_session_key, &dp, dend);
if (ret)
goto out;
......@@ -203,15 +199,13 @@ static int process_one_ticket(struct ceph_auth_client *ac,
ceph_decode_8_safe(p, end, is_enc, bad);
if (is_enc) {
/* encrypted */
dout(" encrypted ticket\n");
dlen = ceph_x_decrypt(&old_key, p, end, &ticket_buf, 0);
if (dlen < 0) {
ret = dlen;
tp = *p + ceph_x_encrypt_offset();
ret = ceph_x_decrypt(&th->session_key, p, end);
if (ret < 0)
goto out;
}
tp = ticket_buf;
dout(" encrypted ticket, decrypted %d bytes\n", ret);
ptp = &tp;
tpend = *ptp + dlen;
tpend = tp + ret;
} else {
/* unencrypted */
ptp = p;
......@@ -242,8 +236,6 @@ static int process_one_ticket(struct ceph_auth_client *ac,
xi->have_keys |= th->service;
out:
kfree(ticket_buf);
kfree(dbuf);
return ret;
bad:
......@@ -294,7 +286,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
{
int maxlen;
struct ceph_x_authorize_a *msg_a;
struct ceph_x_authorize_b msg_b;
struct ceph_x_authorize_b *msg_b;
void *p, *end;
int ret;
int ticket_blob_len =
......@@ -308,8 +300,8 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
if (ret)
goto out_au;
maxlen = sizeof(*msg_a) + sizeof(msg_b) +
ceph_x_encrypt_buflen(ticket_blob_len);
maxlen = sizeof(*msg_a) + ticket_blob_len +
ceph_x_encrypt_buflen(sizeof(*msg_b));
dout(" need len %d\n", maxlen);
if (au->buf && au->buf->alloc_len < maxlen) {
ceph_buffer_put(au->buf);
......@@ -343,18 +335,19 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
p += ticket_blob_len;
end = au->buf->vec.iov_base + au->buf->vec.iov_len;
msg_b = p + ceph_x_encrypt_offset();
msg_b->struct_v = 1;
get_random_bytes(&au->nonce, sizeof(au->nonce));
msg_b.struct_v = 1;
msg_b.nonce = cpu_to_le64(au->nonce);
ret = ceph_x_encrypt(&au->session_key, &msg_b, sizeof(msg_b),
p, end - p);
msg_b->nonce = cpu_to_le64(au->nonce);
ret = ceph_x_encrypt(&au->session_key, p, end - p, sizeof(*msg_b));
if (ret < 0)
goto out_au;
p += ret;
WARN_ON(p > end);
au->buf->vec.iov_len = p - au->buf->vec.iov_base;
dout(" built authorizer nonce %llx len %d\n", au->nonce,
(int)au->buf->vec.iov_len);
BUG_ON(au->buf->vec.iov_len > maxlen);
return 0;
out_au:
......@@ -452,8 +445,9 @@ static int ceph_x_build_request(struct ceph_auth_client *ac,
if (need & CEPH_ENTITY_TYPE_AUTH) {
struct ceph_x_authenticate *auth = (void *)(head + 1);
void *p = auth + 1;
struct ceph_x_challenge_blob tmp;
char tmp_enc[40];
void *enc_buf = xi->auth_authorizer.enc_buf;
struct ceph_x_challenge_blob *blob = enc_buf +
ceph_x_encrypt_offset();
u64 *u;
if (p > end)
......@@ -464,16 +458,16 @@ static int ceph_x_build_request(struct ceph_auth_client *ac,
/* encrypt and hash */
get_random_bytes(&auth->client_challenge, sizeof(u64));
tmp.client_challenge = auth->client_challenge;
tmp.server_challenge = cpu_to_le64(xi->server_challenge);
ret = ceph_x_encrypt(&xi->secret, &tmp, sizeof(tmp),
tmp_enc, sizeof(tmp_enc));
blob->client_challenge = auth->client_challenge;
blob->server_challenge = cpu_to_le64(xi->server_challenge);
ret = ceph_x_encrypt(&xi->secret, enc_buf, CEPHX_AU_ENC_BUF_LEN,
sizeof(*blob));
if (ret < 0)
return ret;
auth->struct_v = 1;
auth->key = 0;
for (u = (u64 *)tmp_enc; u + 1 <= (u64 *)(tmp_enc + ret); u++)
for (u = (u64 *)enc_buf; u + 1 <= (u64 *)(enc_buf + ret); u++)
auth->key ^= *(__le64 *)u;
dout(" server_challenge %llx client_challenge %llx key %llx\n",
xi->server_challenge, le64_to_cpu(auth->client_challenge),
......@@ -600,8 +594,8 @@ static int ceph_x_create_authorizer(
auth->authorizer = (struct ceph_authorizer *) au;
auth->authorizer_buf = au->buf->vec.iov_base;
auth->authorizer_buf_len = au->buf->vec.iov_len;
auth->authorizer_reply_buf = au->reply_buf;
auth->authorizer_reply_buf_len = sizeof (au->reply_buf);
auth->authorizer_reply_buf = au->enc_buf;
auth->authorizer_reply_buf_len = CEPHX_AU_ENC_BUF_LEN;
auth->sign_message = ac->ops->sign_message;
auth->check_message_signature = ac->ops->check_message_signature;
......@@ -629,27 +623,25 @@ static int ceph_x_update_authorizer(
}
static int ceph_x_verify_authorizer_reply(struct ceph_auth_client *ac,
struct ceph_authorizer *a, size_t len)
struct ceph_authorizer *a)
{
struct ceph_x_authorizer *au = (void *)a;
int ret = 0;
struct ceph_x_authorize_reply reply;
void *preply = &reply;
void *p = au->reply_buf;
void *end = p + sizeof(au->reply_buf);
void *p = au->enc_buf;
struct ceph_x_authorize_reply *reply = p + ceph_x_encrypt_offset();
int ret;
ret = ceph_x_decrypt(&au->session_key, &p, end, &preply, sizeof(reply));
ret = ceph_x_decrypt(&au->session_key, &p, p + CEPHX_AU_ENC_BUF_LEN);
if (ret < 0)
return ret;
if (ret != sizeof(reply))
if (ret != sizeof(*reply))
return -EPERM;
if (au->nonce + 1 != le64_to_cpu(reply.nonce_plus_one))
if (au->nonce + 1 != le64_to_cpu(reply->nonce_plus_one))
ret = -EPERM;
else
ret = 0;
dout("verify_authorizer_reply nonce %llx got %llx ret %d\n",
au->nonce, le64_to_cpu(reply.nonce_plus_one), ret);
au->nonce, le64_to_cpu(reply->nonce_plus_one), ret);
return ret;
}
......@@ -704,35 +696,48 @@ static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac,
invalidate_ticket(ac, CEPH_ENTITY_TYPE_AUTH);
}
static int calcu_signature(struct ceph_x_authorizer *au,
struct ceph_msg *msg, __le64 *sig)
static int calc_signature(struct ceph_x_authorizer *au, struct ceph_msg *msg,
__le64 *psig)
{
void *enc_buf = au->enc_buf;
struct {
__le32 len;
__le32 header_crc;
__le32 front_crc;
__le32 middle_crc;
__le32 data_crc;
} __packed *sigblock = enc_buf + ceph_x_encrypt_offset();
int ret;
char tmp_enc[40];
__le32 tmp[5] = {
cpu_to_le32(16), msg->hdr.crc, msg->footer.front_crc,
msg->footer.middle_crc, msg->footer.data_crc,
};
ret = ceph_x_encrypt(&au->session_key, &tmp, sizeof(tmp),
tmp_enc, sizeof(tmp_enc));
sigblock->len = cpu_to_le32(4*sizeof(u32));
sigblock->header_crc = msg->hdr.crc;
sigblock->front_crc = msg->footer.front_crc;
sigblock->middle_crc = msg->footer.middle_crc;
sigblock->data_crc = msg->footer.data_crc;
ret = ceph_x_encrypt(&au->session_key, enc_buf, CEPHX_AU_ENC_BUF_LEN,
sizeof(*sigblock));
if (ret < 0)
return ret;
*sig = *(__le64*)(tmp_enc + 4);
*psig = *(__le64 *)(enc_buf + sizeof(u32));
return 0;
}
static int ceph_x_sign_message(struct ceph_auth_handshake *auth,
struct ceph_msg *msg)
{
__le64 sig;
int ret;
if (ceph_test_opt(from_msgr(msg->con->msgr), NOMSGSIGN))
return 0;
ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer,
msg, &msg->footer.sig);
if (ret < 0)
ret = calc_signature((struct ceph_x_authorizer *)auth->authorizer,
msg, &sig);
if (ret)
return ret;
msg->footer.sig = sig;
msg->footer.flags |= CEPH_MSG_FOOTER_SIGNED;
return 0;
}
......@@ -746,9 +751,9 @@ static int ceph_x_check_message_signature(struct ceph_auth_handshake *auth,
if (ceph_test_opt(from_msgr(msg->con->msgr), NOMSGSIGN))
return 0;
ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer,
msg, &sig_check);
if (ret < 0)
ret = calc_signature((struct ceph_x_authorizer *)auth->authorizer,
msg, &sig_check);
if (ret)
return ret;
if (sig_check == msg->footer.sig)
return 0;
......
......@@ -24,6 +24,7 @@ struct ceph_x_ticket_handler {
unsigned long renew_after, expires;
};
#define CEPHX_AU_ENC_BUF_LEN 128 /* big enough for encrypted blob */
struct ceph_x_authorizer {
struct ceph_authorizer base;
......@@ -32,7 +33,7 @@ struct ceph_x_authorizer {
unsigned int service;
u64 nonce;
u64 secret_id;
char reply_buf[128]; /* big enough for encrypted blob */
char enc_buf[CEPHX_AU_ENC_BUF_LEN] __aligned(8);
};
struct ceph_x_info {
......
......@@ -17,10 +17,12 @@
# include <linux/kernel.h>
# include <linux/crush/crush.h>
# include <linux/crush/hash.h>
# include <linux/crush/mapper.h>
#else
# include "crush_compat.h"
# include "crush.h"
# include "hash.h"
# include "mapper.h"
#endif
#include "crush_ln_table.h"
......
......@@ -13,14 +13,60 @@
#include <linux/ceph/decode.h>
#include "crypto.h"
/*
* Set ->key and ->tfm. The rest of the key should be filled in before
* this function is called.
*/
static int set_secret(struct ceph_crypto_key *key, void *buf)
{
unsigned int noio_flag;
int ret;
key->key = NULL;
key->tfm = NULL;
switch (key->type) {
case CEPH_CRYPTO_NONE:
return 0; /* nothing to do */
case CEPH_CRYPTO_AES:
break;
default:
return -ENOTSUPP;
}
WARN_ON(!key->len);
key->key = kmemdup(buf, key->len, GFP_NOIO);
if (!key->key) {
ret = -ENOMEM;
goto fail;
}
/* crypto_alloc_skcipher() allocates with GFP_KERNEL */
noio_flag = memalloc_noio_save();
key->tfm = crypto_alloc_skcipher("cbc(aes)", 0, CRYPTO_ALG_ASYNC);
memalloc_noio_restore(noio_flag);
if (IS_ERR(key->tfm)) {
ret = PTR_ERR(key->tfm);
key->tfm = NULL;
goto fail;
}
ret = crypto_skcipher_setkey(key->tfm, key->key, key->len);
if (ret)
goto fail;
return 0;
fail:
ceph_crypto_key_destroy(key);
return ret;
}
int ceph_crypto_key_clone(struct ceph_crypto_key *dst,
const struct ceph_crypto_key *src)
{
memcpy(dst, src, sizeof(struct ceph_crypto_key));
dst->key = kmemdup(src->key, src->len, GFP_NOFS);
if (!dst->key)
return -ENOMEM;
return 0;
return set_secret(dst, src->key);
}
int ceph_crypto_key_encode(struct ceph_crypto_key *key, void **p, void *end)
......@@ -37,16 +83,16 @@ int ceph_crypto_key_encode(struct ceph_crypto_key *key, void **p, void *end)
int ceph_crypto_key_decode(struct ceph_crypto_key *key, void **p, void *end)
{
int ret;
ceph_decode_need(p, end, 2*sizeof(u16) + sizeof(key->created), bad);
key->type = ceph_decode_16(p);
ceph_decode_copy(p, &key->created, sizeof(key->created));
key->len = ceph_decode_16(p);
ceph_decode_need(p, end, key->len, bad);
key->key = kmalloc(key->len, GFP_NOFS);
if (!key->key)
return -ENOMEM;
ceph_decode_copy(p, key->key, key->len);
return 0;
ret = set_secret(key, *p);
*p += key->len;
return ret;
bad:
dout("failed to decode crypto key\n");
......@@ -80,9 +126,14 @@ int ceph_crypto_key_unarmor(struct ceph_crypto_key *key, const char *inkey)
return 0;
}
static struct crypto_skcipher *ceph_crypto_alloc_cipher(void)
void ceph_crypto_key_destroy(struct ceph_crypto_key *key)
{
return crypto_alloc_skcipher("cbc(aes)", 0, CRYPTO_ALG_ASYNC);
if (key) {
kfree(key->key);
key->key = NULL;
crypto_free_skcipher(key->tfm);
key->tfm = NULL;
}
}
static const u8 *aes_iv = (u8 *)CEPH_AES_IV;
......@@ -157,372 +208,82 @@ static void teardown_sgtable(struct sg_table *sgt)
sg_free_table(sgt);
}
static int ceph_aes_encrypt(const void *key, int key_len,
void *dst, size_t *dst_len,
const void *src, size_t src_len)
static int ceph_aes_crypt(const struct ceph_crypto_key *key, bool encrypt,
void *buf, int buf_len, int in_len, int *pout_len)
{
struct scatterlist sg_in[2], prealloc_sg;
struct sg_table sg_out;
struct crypto_skcipher *tfm = ceph_crypto_alloc_cipher();
SKCIPHER_REQUEST_ON_STACK(req, tfm);
int ret;
SKCIPHER_REQUEST_ON_STACK(req, key->tfm);
struct sg_table sgt;
struct scatterlist prealloc_sg;
char iv[AES_BLOCK_SIZE];
size_t zero_padding = (0x10 - (src_len & 0x0f));
char pad[16];
if (IS_ERR(tfm))
return PTR_ERR(tfm);
memset(pad, zero_padding, zero_padding);
*dst_len = src_len + zero_padding;
sg_init_table(sg_in, 2);
sg_set_buf(&sg_in[0], src, src_len);
sg_set_buf(&sg_in[1], pad, zero_padding);
ret = setup_sgtable(&sg_out, &prealloc_sg, dst, *dst_len);
if (ret)
goto out_tfm;
crypto_skcipher_setkey((void *)tfm, key, key_len);
memcpy(iv, aes_iv, AES_BLOCK_SIZE);
skcipher_request_set_tfm(req, tfm);
skcipher_request_set_callback(req, 0, NULL, NULL);
skcipher_request_set_crypt(req, sg_in, sg_out.sgl,
src_len + zero_padding, iv);
/*
print_hex_dump(KERN_ERR, "enc key: ", DUMP_PREFIX_NONE, 16, 1,
key, key_len, 1);
print_hex_dump(KERN_ERR, "enc src: ", DUMP_PREFIX_NONE, 16, 1,
src, src_len, 1);
print_hex_dump(KERN_ERR, "enc pad: ", DUMP_PREFIX_NONE, 16, 1,
pad, zero_padding, 1);
*/
ret = crypto_skcipher_encrypt(req);
skcipher_request_zero(req);
if (ret < 0) {
pr_err("ceph_aes_crypt failed %d\n", ret);
goto out_sg;
}
/*
print_hex_dump(KERN_ERR, "enc out: ", DUMP_PREFIX_NONE, 16, 1,
dst, *dst_len, 1);
*/
out_sg:
teardown_sgtable(&sg_out);
out_tfm:
crypto_free_skcipher(tfm);
return ret;
}
static int ceph_aes_encrypt2(const void *key, int key_len, void *dst,
size_t *dst_len,
const void *src1, size_t src1_len,
const void *src2, size_t src2_len)
{
struct scatterlist sg_in[3], prealloc_sg;
struct sg_table sg_out;
struct crypto_skcipher *tfm = ceph_crypto_alloc_cipher();
SKCIPHER_REQUEST_ON_STACK(req, tfm);
int pad_byte = AES_BLOCK_SIZE - (in_len & (AES_BLOCK_SIZE - 1));
int crypt_len = encrypt ? in_len + pad_byte : in_len;
int ret;
char iv[AES_BLOCK_SIZE];
size_t zero_padding = (0x10 - ((src1_len + src2_len) & 0x0f));
char pad[16];
if (IS_ERR(tfm))
return PTR_ERR(tfm);
memset(pad, zero_padding, zero_padding);
*dst_len = src1_len + src2_len + zero_padding;
sg_init_table(sg_in, 3);
sg_set_buf(&sg_in[0], src1, src1_len);
sg_set_buf(&sg_in[1], src2, src2_len);
sg_set_buf(&sg_in[2], pad, zero_padding);
ret = setup_sgtable(&sg_out, &prealloc_sg, dst, *dst_len);
WARN_ON(crypt_len > buf_len);
if (encrypt)
memset(buf + in_len, pad_byte, pad_byte);
ret = setup_sgtable(&sgt, &prealloc_sg, buf, crypt_len);
if (ret)
goto out_tfm;
crypto_skcipher_setkey((void *)tfm, key, key_len);
memcpy(iv, aes_iv, AES_BLOCK_SIZE);
skcipher_request_set_tfm(req, tfm);
skcipher_request_set_callback(req, 0, NULL, NULL);
skcipher_request_set_crypt(req, sg_in, sg_out.sgl,
src1_len + src2_len + zero_padding, iv);
/*
print_hex_dump(KERN_ERR, "enc key: ", DUMP_PREFIX_NONE, 16, 1,
key, key_len, 1);
print_hex_dump(KERN_ERR, "enc src1: ", DUMP_PREFIX_NONE, 16, 1,
src1, src1_len, 1);
print_hex_dump(KERN_ERR, "enc src2: ", DUMP_PREFIX_NONE, 16, 1,
src2, src2_len, 1);
print_hex_dump(KERN_ERR, "enc pad: ", DUMP_PREFIX_NONE, 16, 1,
pad, zero_padding, 1);
*/
ret = crypto_skcipher_encrypt(req);
skcipher_request_zero(req);
if (ret < 0) {
pr_err("ceph_aes_crypt2 failed %d\n", ret);
goto out_sg;
}
/*
print_hex_dump(KERN_ERR, "enc out: ", DUMP_PREFIX_NONE, 16, 1,
dst, *dst_len, 1);
*/
out_sg:
teardown_sgtable(&sg_out);
out_tfm:
crypto_free_skcipher(tfm);
return ret;
}
static int ceph_aes_decrypt(const void *key, int key_len,
void *dst, size_t *dst_len,
const void *src, size_t src_len)
{
struct sg_table sg_in;
struct scatterlist sg_out[2], prealloc_sg;
struct crypto_skcipher *tfm = ceph_crypto_alloc_cipher();
SKCIPHER_REQUEST_ON_STACK(req, tfm);
char pad[16];
char iv[AES_BLOCK_SIZE];
int ret;
int last_byte;
if (IS_ERR(tfm))
return PTR_ERR(tfm);
sg_init_table(sg_out, 2);
sg_set_buf(&sg_out[0], dst, *dst_len);
sg_set_buf(&sg_out[1], pad, sizeof(pad));
ret = setup_sgtable(&sg_in, &prealloc_sg, src, src_len);
if (ret)
goto out_tfm;
return ret;
crypto_skcipher_setkey((void *)tfm, key, key_len);
memcpy(iv, aes_iv, AES_BLOCK_SIZE);
skcipher_request_set_tfm(req, tfm);
skcipher_request_set_tfm(req, key->tfm);
skcipher_request_set_callback(req, 0, NULL, NULL);
skcipher_request_set_crypt(req, sg_in.sgl, sg_out,
src_len, iv);
skcipher_request_set_crypt(req, sgt.sgl, sgt.sgl, crypt_len, iv);
/*
print_hex_dump(KERN_ERR, "dec key: ", DUMP_PREFIX_NONE, 16, 1,
key, key_len, 1);
print_hex_dump(KERN_ERR, "dec in: ", DUMP_PREFIX_NONE, 16, 1,
src, src_len, 1);
print_hex_dump(KERN_ERR, "key: ", DUMP_PREFIX_NONE, 16, 1,
key->key, key->len, 1);
print_hex_dump(KERN_ERR, " in: ", DUMP_PREFIX_NONE, 16, 1,
buf, crypt_len, 1);
*/
ret = crypto_skcipher_decrypt(req);
skcipher_request_zero(req);
if (ret < 0) {
pr_err("ceph_aes_decrypt failed %d\n", ret);
goto out_sg;
}
if (src_len <= *dst_len)
last_byte = ((char *)dst)[src_len - 1];
if (encrypt)
ret = crypto_skcipher_encrypt(req);
else
last_byte = pad[src_len - *dst_len - 1];
if (last_byte <= 16 && src_len >= last_byte) {
*dst_len = src_len - last_byte;
} else {
pr_err("ceph_aes_decrypt got bad padding %d on src len %d\n",
last_byte, (int)src_len);
return -EPERM; /* bad padding */
}
/*
print_hex_dump(KERN_ERR, "dec out: ", DUMP_PREFIX_NONE, 16, 1,
dst, *dst_len, 1);
*/
out_sg:
teardown_sgtable(&sg_in);
out_tfm:
crypto_free_skcipher(tfm);
return ret;
}
static int ceph_aes_decrypt2(const void *key, int key_len,
void *dst1, size_t *dst1_len,
void *dst2, size_t *dst2_len,
const void *src, size_t src_len)
{
struct sg_table sg_in;
struct scatterlist sg_out[3], prealloc_sg;
struct crypto_skcipher *tfm = ceph_crypto_alloc_cipher();
SKCIPHER_REQUEST_ON_STACK(req, tfm);
char pad[16];
char iv[AES_BLOCK_SIZE];
int ret;
int last_byte;
if (IS_ERR(tfm))
return PTR_ERR(tfm);
sg_init_table(sg_out, 3);
sg_set_buf(&sg_out[0], dst1, *dst1_len);
sg_set_buf(&sg_out[1], dst2, *dst2_len);
sg_set_buf(&sg_out[2], pad, sizeof(pad));
ret = setup_sgtable(&sg_in, &prealloc_sg, src, src_len);
if (ret)
goto out_tfm;
crypto_skcipher_setkey((void *)tfm, key, key_len);
memcpy(iv, aes_iv, AES_BLOCK_SIZE);
skcipher_request_set_tfm(req, tfm);
skcipher_request_set_callback(req, 0, NULL, NULL);
skcipher_request_set_crypt(req, sg_in.sgl, sg_out,
src_len, iv);
/*
print_hex_dump(KERN_ERR, "dec key: ", DUMP_PREFIX_NONE, 16, 1,
key, key_len, 1);
print_hex_dump(KERN_ERR, "dec in: ", DUMP_PREFIX_NONE, 16, 1,
src, src_len, 1);
*/
ret = crypto_skcipher_decrypt(req);
ret = crypto_skcipher_decrypt(req);
skcipher_request_zero(req);
if (ret < 0) {
pr_err("ceph_aes_decrypt failed %d\n", ret);
goto out_sg;
}
if (src_len <= *dst1_len)
last_byte = ((char *)dst1)[src_len - 1];
else if (src_len <= *dst1_len + *dst2_len)
last_byte = ((char *)dst2)[src_len - *dst1_len - 1];
else
last_byte = pad[src_len - *dst1_len - *dst2_len - 1];
if (last_byte <= 16 && src_len >= last_byte) {
src_len -= last_byte;
} else {
pr_err("ceph_aes_decrypt got bad padding %d on src len %d\n",
last_byte, (int)src_len);
return -EPERM; /* bad padding */
}
if (src_len < *dst1_len) {
*dst1_len = src_len;
*dst2_len = 0;
} else {
*dst2_len = src_len - *dst1_len;
if (ret) {
pr_err("%s %scrypt failed: %d\n", __func__,
encrypt ? "en" : "de", ret);
goto out_sgt;
}
/*
print_hex_dump(KERN_ERR, "dec out1: ", DUMP_PREFIX_NONE, 16, 1,
dst1, *dst1_len, 1);
print_hex_dump(KERN_ERR, "dec out2: ", DUMP_PREFIX_NONE, 16, 1,
dst2, *dst2_len, 1);
print_hex_dump(KERN_ERR, "out: ", DUMP_PREFIX_NONE, 16, 1,
buf, crypt_len, 1);
*/
out_sg:
teardown_sgtable(&sg_in);
out_tfm:
crypto_free_skcipher(tfm);
return ret;
}
int ceph_decrypt(struct ceph_crypto_key *secret, void *dst, size_t *dst_len,
const void *src, size_t src_len)
{
switch (secret->type) {
case CEPH_CRYPTO_NONE:
if (*dst_len < src_len)
return -ERANGE;
memcpy(dst, src, src_len);
*dst_len = src_len;
return 0;
case CEPH_CRYPTO_AES:
return ceph_aes_decrypt(secret->key, secret->len, dst,
dst_len, src, src_len);
default:
return -EINVAL;
}
}
int ceph_decrypt2(struct ceph_crypto_key *secret,
void *dst1, size_t *dst1_len,
void *dst2, size_t *dst2_len,
const void *src, size_t src_len)
{
size_t t;
switch (secret->type) {
case CEPH_CRYPTO_NONE:
if (*dst1_len + *dst2_len < src_len)
return -ERANGE;
t = min(*dst1_len, src_len);
memcpy(dst1, src, t);
*dst1_len = t;
src += t;
src_len -= t;
if (src_len) {
t = min(*dst2_len, src_len);
memcpy(dst2, src, t);
*dst2_len = t;
if (encrypt) {
*pout_len = crypt_len;
} else {
pad_byte = *(char *)(buf + in_len - 1);
if (pad_byte > 0 && pad_byte <= AES_BLOCK_SIZE &&
in_len >= pad_byte) {
*pout_len = in_len - pad_byte;
} else {
pr_err("%s got bad padding %d on in_len %d\n",
__func__, pad_byte, in_len);
ret = -EPERM;
goto out_sgt;
}
return 0;
case CEPH_CRYPTO_AES:
return ceph_aes_decrypt2(secret->key, secret->len,
dst1, dst1_len, dst2, dst2_len,
src, src_len);
default:
return -EINVAL;
}
}
int ceph_encrypt(struct ceph_crypto_key *secret, void *dst, size_t *dst_len,
const void *src, size_t src_len)
{
switch (secret->type) {
case CEPH_CRYPTO_NONE:
if (*dst_len < src_len)
return -ERANGE;
memcpy(dst, src, src_len);
*dst_len = src_len;
return 0;
case CEPH_CRYPTO_AES:
return ceph_aes_encrypt(secret->key, secret->len, dst,
dst_len, src, src_len);
default:
return -EINVAL;
}
out_sgt:
teardown_sgtable(&sgt);
return ret;
}
int ceph_encrypt2(struct ceph_crypto_key *secret, void *dst, size_t *dst_len,
const void *src1, size_t src1_len,
const void *src2, size_t src2_len)
int ceph_crypt(const struct ceph_crypto_key *key, bool encrypt,
void *buf, int buf_len, int in_len, int *pout_len)
{
switch (secret->type) {
switch (key->type) {
case CEPH_CRYPTO_NONE:
if (*dst_len < src1_len + src2_len)
return -ERANGE;
memcpy(dst, src1, src1_len);
memcpy(dst + src1_len, src2, src2_len);
*dst_len = src1_len + src2_len;
*pout_len = in_len;
return 0;
case CEPH_CRYPTO_AES:
return ceph_aes_encrypt2(secret->key, secret->len, dst, dst_len,
src1, src1_len, src2, src2_len);
return ceph_aes_crypt(key, encrypt, buf, buf_len, in_len,
pout_len);
default:
return -EINVAL;
return -ENOTSUPP;
}
}
......
......@@ -12,37 +12,19 @@ struct ceph_crypto_key {
struct ceph_timespec created;
int len;
void *key;
struct crypto_skcipher *tfm;
};
static inline void ceph_crypto_key_destroy(struct ceph_crypto_key *key)
{
if (key) {
kfree(key->key);
key->key = NULL;
}
}
int ceph_crypto_key_clone(struct ceph_crypto_key *dst,
const struct ceph_crypto_key *src);
int ceph_crypto_key_encode(struct ceph_crypto_key *key, void **p, void *end);
int ceph_crypto_key_decode(struct ceph_crypto_key *key, void **p, void *end);
int ceph_crypto_key_unarmor(struct ceph_crypto_key *key, const char *in);
void ceph_crypto_key_destroy(struct ceph_crypto_key *key);
/* crypto.c */
int ceph_decrypt(struct ceph_crypto_key *secret,
void *dst, size_t *dst_len,
const void *src, size_t src_len);
int ceph_encrypt(struct ceph_crypto_key *secret,
void *dst, size_t *dst_len,
const void *src, size_t src_len);
int ceph_decrypt2(struct ceph_crypto_key *secret,
void *dst1, size_t *dst1_len,
void *dst2, size_t *dst2_len,
const void *src, size_t src_len);
int ceph_encrypt2(struct ceph_crypto_key *secret,
void *dst, size_t *dst_len,
const void *src1, size_t src1_len,
const void *src2, size_t src2_len);
int ceph_crypt(const struct ceph_crypto_key *key, bool encrypt,
void *buf, int buf_len, int in_len, int *pout_len);
int ceph_crypto_init(void);
void ceph_crypto_shutdown(void);
......
......@@ -1393,15 +1393,9 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection
return NULL;
}
/* Can't hold the mutex while getting authorizer */
mutex_unlock(&con->mutex);
auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);
mutex_lock(&con->mutex);
if (IS_ERR(auth))
return auth;
if (con->state != CON_STATE_NEGOTIATING)
return ERR_PTR(-EAGAIN);
con->auth_reply_buf = auth->authorizer_reply_buf;
con->auth_reply_buf_len = auth->authorizer_reply_buf_len;
......@@ -2027,6 +2021,19 @@ static int process_connect(struct ceph_connection *con)
dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
if (con->auth_reply_buf) {
/*
* Any connection that defines ->get_authorizer()
* should also define ->verify_authorizer_reply().
* See get_connect_authorizer().
*/
ret = con->ops->verify_authorizer_reply(con);
if (ret < 0) {
con->error_msg = "bad authorize reply";
return ret;
}
}
switch (con->in_reply.tag) {
case CEPH_MSGR_TAG_FEATURES:
pr_err("%s%lld %s feature set mismatch,"
......
......@@ -1028,21 +1028,21 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
err = -ENOMEM;
monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
sizeof(struct ceph_mon_subscribe_ack),
GFP_NOFS, true);
GFP_KERNEL, true);
if (!monc->m_subscribe_ack)
goto out_auth;
monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128, GFP_NOFS,
true);
monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128,
GFP_KERNEL, true);
if (!monc->m_subscribe)
goto out_subscribe_ack;
monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS,
true);
monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096,
GFP_KERNEL, true);
if (!monc->m_auth_reply)
goto out_subscribe;
monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true);
monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true);
monc->pending_auth = 0;
if (!monc->m_auth)
goto out_auth_reply;
......
......@@ -460,7 +460,7 @@ static void request_init(struct ceph_osd_request *req)
kref_init(&req->r_kref);
init_completion(&req->r_completion);
init_completion(&req->r_safe_completion);
init_completion(&req->r_done_completion);
RB_CLEAR_NODE(&req->r_node);
RB_CLEAR_NODE(&req->r_mc_node);
INIT_LIST_HEAD(&req->r_unsafe_item);
......@@ -1725,7 +1725,7 @@ static void submit_request(struct ceph_osd_request *req, bool wrlocked)
__submit_request(req, wrlocked);
}
static void __finish_request(struct ceph_osd_request *req)
static void finish_request(struct ceph_osd_request *req)
{
struct ceph_osd_client *osdc = req->r_osdc;
struct ceph_osd *osd = req->r_osd;
......@@ -1747,12 +1747,6 @@ static void __finish_request(struct ceph_osd_request *req)
ceph_msg_revoke_incoming(req->r_reply);
}
static void finish_request(struct ceph_osd_request *req)
{
__finish_request(req);
ceph_osdc_put_request(req);
}
static void __complete_request(struct ceph_osd_request *req)
{
if (req->r_callback)
......@@ -1770,9 +1764,9 @@ static void complete_request(struct ceph_osd_request *req, int err)
dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
req->r_result = err;
__finish_request(req);
finish_request(req);
__complete_request(req);
complete_all(&req->r_safe_completion);
complete_all(&req->r_done_completion);
ceph_osdc_put_request(req);
}
......@@ -1798,6 +1792,8 @@ static void cancel_request(struct ceph_osd_request *req)
cancel_map_check(req);
finish_request(req);
complete_all(&req->r_done_completion);
ceph_osdc_put_request(req);
}
static void check_pool_dne(struct ceph_osd_request *req)
......@@ -2808,12 +2804,12 @@ static bool done_request(const struct ceph_osd_request *req,
* ->r_unsafe_callback is set? yes no
*
* first reply is OK (needed r_cb/r_completion, r_cb/r_completion,
* any or needed/got safe) r_safe_completion r_safe_completion
* any or needed/got safe) r_done_completion r_done_completion
*
* first reply is unsafe r_unsafe_cb(true) (nothing)
*
* when we get the safe reply r_unsafe_cb(false), r_cb/r_completion,
* r_safe_completion r_safe_completion
* r_done_completion r_done_completion
*/
static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
{
......@@ -2915,7 +2911,7 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
}
if (done_request(req, &m)) {
__finish_request(req);
finish_request(req);
if (req->r_linger) {
WARN_ON(req->r_unsafe_callback);
dout("req %p tid %llu cb (locked)\n", req, req->r_tid);
......@@ -2934,8 +2930,7 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
dout("req %p tid %llu cb\n", req, req->r_tid);
__complete_request(req);
}
if (m.flags & CEPH_OSD_FLAG_ONDISK)
complete_all(&req->r_safe_completion);
complete_all(&req->r_done_completion);
ceph_osdc_put_request(req);
} else {
if (req->r_unsafe_callback) {
......@@ -3471,9 +3466,8 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
EXPORT_SYMBOL(ceph_osdc_start_request);
/*
* Unregister a registered request. The request is not completed (i.e.
* no callbacks or wakeups) - higher layers are supposed to know what
* they are canceling.
* Unregister a registered request. The request is not completed:
* ->r_result isn't set and __complete_request() isn't called.
*/
void ceph_osdc_cancel_request(struct ceph_osd_request *req)
{
......@@ -3500,9 +3494,6 @@ static int wait_request_timeout(struct ceph_osd_request *req,
if (left <= 0) {
left = left ?: -ETIMEDOUT;
ceph_osdc_cancel_request(req);
/* kludge - need to to wake ceph_osdc_sync() */
complete_all(&req->r_safe_completion);
} else {
left = req->r_result; /* completed */
}
......@@ -3549,7 +3540,7 @@ void ceph_osdc_sync(struct ceph_osd_client *osdc)
up_read(&osdc->lock);
dout("%s waiting on req %p tid %llu last_tid %llu\n",
__func__, req, req->r_tid, last_tid);
wait_for_completion(&req->r_safe_completion);
wait_for_completion(&req->r_done_completion);
ceph_osdc_put_request(req);
goto again;
}
......@@ -4478,13 +4469,13 @@ static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
}
static int verify_authorizer_reply(struct ceph_connection *con, int len)
static int verify_authorizer_reply(struct ceph_connection *con)
{
struct ceph_osd *o = con->private;
struct ceph_osd_client *osdc = o->o_osdc;
struct ceph_auth_client *ac = osdc->client->monc.auth;
return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len);
return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer);
}
static int invalidate_authorizer(struct ceph_connection *con)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册