diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 7b274ff4632c6944e32b95d605ee9aa9769f7082..36d2b9f4e83654637925d57fce5a4fc8cd0f009e 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -3756,7 +3756,7 @@ static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, struct rbd_device *rbd_dev = arg; void *p = data; void *const end = p + data_len; - u8 struct_v; + u8 struct_v = 0; u32 len; u32 notify_op; int ret; diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index ef3ebd780aff8bf8c9203ae366f3d4b688cd7af6..a0f1e2b91c8e0f46051e398b663f89220365f187 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -315,7 +315,32 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) struct page **pages; pgoff_t next_index; int nr_pages = 0; - int ret; + int got = 0; + int ret = 0; + + if (!current->journal_info) { + /* caller of readpages does not hold buffer and read caps + * (fadvise, madvise and readahead cases) */ + int want = CEPH_CAP_FILE_CACHE; + ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, &got); + if (ret < 0) { + dout("start_read %p, error getting cap\n", inode); + } else if (!(got & want)) { + dout("start_read %p, no cache cap\n", inode); + ret = 0; + } + if (ret <= 0) { + if (got) + ceph_put_cap_refs(ci, got); + while (!list_empty(page_list)) { + page = list_entry(page_list->prev, + struct page, lru); + list_del(&page->lru); + put_page(page); + } + return ret; + } + } off = (u64) page_offset(page); @@ -338,15 +363,18 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) CEPH_OSD_FLAG_READ, NULL, ci->i_truncate_seq, ci->i_truncate_size, false); - if (IS_ERR(req)) - return PTR_ERR(req); + if (IS_ERR(req)) { + ret = PTR_ERR(req); + goto out; + } /* build page vector */ nr_pages = calc_pages_for(0, len); pages = kmalloc(sizeof(*pages) * nr_pages, GFP_KERNEL); - ret = -ENOMEM; - if (!pages) - goto out; + if (!pages) { + ret = -ENOMEM; + goto out_put; + } for (i = 0; i < nr_pages; ++i) { page = list_entry(page_list->prev, struct page, lru); BUG_ON(PageLocked(page)); @@ -378,6 +406,12 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) if (ret < 0) goto out_pages; ceph_osdc_put_request(req); + + /* After adding locked pages to page cache, the inode holds cache cap. + * So we can drop our cap refs. */ + if (got) + ceph_put_cap_refs(ci, got); + return nr_pages; out_pages: @@ -386,8 +420,11 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) unlock_page(pages[i]); } ceph_put_page_vector(pages, nr_pages, false); -out: +out_put: ceph_osdc_put_request(req); +out: + if (got) + ceph_put_cap_refs(ci, got); return ret; } @@ -424,7 +461,6 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, rc = start_read(inode, page_list, max); if (rc < 0) goto out; - BUG_ON(rc == 0); } out: ceph_fscache_readpages_cancel(inode, page_list); @@ -438,7 +474,9 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, * only snap context we are allowed to write back. */ static struct ceph_snap_context *get_oldest_context(struct inode *inode, - loff_t *snap_size) + loff_t *snap_size, + u64 *truncate_size, + u32 *truncate_seq) { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_snap_context *snapc = NULL; @@ -452,6 +490,10 @@ static struct ceph_snap_context *get_oldest_context(struct inode *inode, snapc = ceph_get_snap_context(capsnap->context); if (snap_size) *snap_size = capsnap->size; + if (truncate_size) + *truncate_size = capsnap->truncate_size; + if (truncate_seq) + *truncate_seq = capsnap->truncate_seq; break; } } @@ -459,6 +501,10 @@ static struct ceph_snap_context *get_oldest_context(struct inode *inode, snapc = ceph_get_snap_context(ci->i_head_snapc); dout(" head snapc %p has %d dirty pages\n", snapc, ci->i_wrbuffer_ref_head); + if (truncate_size) + *truncate_size = capsnap->truncate_size; + if (truncate_seq) + *truncate_seq = capsnap->truncate_seq; } spin_unlock(&ci->i_ceph_lock); return snapc; @@ -501,7 +547,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) dout("writepage %p page %p not dirty?\n", inode, page); goto out; } - oldest = get_oldest_context(inode, &snap_size); + oldest = get_oldest_context(inode, &snap_size, + &truncate_size, &truncate_seq); if (snapc->seq > oldest->seq) { dout("writepage %p page %p snapc %p not writeable - noop\n", inode, page, snapc); @@ -512,12 +559,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) } ceph_put_snap_context(oldest); - spin_lock(&ci->i_ceph_lock); - truncate_seq = ci->i_truncate_seq; - truncate_size = ci->i_truncate_size; if (snap_size == -1) snap_size = i_size_read(inode); - spin_unlock(&ci->i_ceph_lock); /* is this a partial page at end of file? */ if (page_off >= snap_size) { @@ -764,7 +807,8 @@ static int ceph_writepages_start(struct address_space *mapping, /* find oldest snap context with dirty data */ ceph_put_snap_context(snapc); snap_size = -1; - snapc = get_oldest_context(inode, &snap_size); + snapc = get_oldest_context(inode, &snap_size, + &truncate_size, &truncate_seq); if (!snapc) { /* hmm, why does writepages get called when there is no dirty data? */ @@ -774,11 +818,7 @@ static int ceph_writepages_start(struct address_space *mapping, dout(" oldest snapc is %p seq %lld (%d snaps)\n", snapc, snapc->seq, snapc->num_snaps); - spin_lock(&ci->i_ceph_lock); - truncate_seq = ci->i_truncate_seq; - truncate_size = ci->i_truncate_size; i_size = i_size_read(inode); - spin_unlock(&ci->i_ceph_lock); if (last_snapc && snapc != last_snapc) { /* if we switched to a newer snapc, restart our scan at the @@ -1124,7 +1164,8 @@ static int ceph_writepages_start(struct address_space *mapping, static int context_is_writeable_or_written(struct inode *inode, struct ceph_snap_context *snapc) { - struct ceph_snap_context *oldest = get_oldest_context(inode, NULL); + struct ceph_snap_context *oldest = get_oldest_context(inode, NULL, + NULL, NULL); int ret = !oldest || snapc->seq <= oldest->seq; ceph_put_snap_context(oldest); @@ -1169,7 +1210,7 @@ static int ceph_update_writeable_page(struct file *file, * this page is already dirty in another (older) snap * context! is it writeable now? */ - oldest = get_oldest_context(inode, NULL); + oldest = get_oldest_context(inode, NULL, NULL, NULL); if (snapc->seq > oldest->seq) { ceph_put_snap_context(oldest); @@ -1371,9 +1412,11 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got)); if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) || - ci->i_inline_version == CEPH_INLINE_NONE) + ci->i_inline_version == CEPH_INLINE_NONE) { + current->journal_info = vma->vm_file; ret = filemap_fault(vma, vmf); - else + current->journal_info = NULL; + } else ret = -EAGAIN; dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n", @@ -1905,6 +1948,15 @@ int ceph_pool_perm_check(struct ceph_inode_info *ci, int need) struct ceph_string *pool_ns; int ret, flags; + if (ci->i_vino.snap != CEPH_NOSNAP) { + /* + * Pool permission check needs to write to the first object. + * But for snapshot, head of the first object may have alread + * been deleted. Skip check to avoid creating orphan object. + */ + return 0; + } + if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode), NOPOOLPERM)) return 0; diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 16e6ded0b7f281bf72e8074b9d2896713fe4aef2..baea866a6751facf4c1f18dda23e71582978dffe 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -987,96 +987,127 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) __cap_delay_cancel(mdsc, ci); } +struct cap_msg_args { + struct ceph_mds_session *session; + u64 ino, cid, follows; + u64 flush_tid, oldest_flush_tid, size, max_size; + u64 xattr_version; + struct ceph_buffer *xattr_buf; + struct timespec atime, mtime, ctime; + int op, caps, wanted, dirty; + u32 seq, issue_seq, mseq, time_warp_seq; + u32 flags; + kuid_t uid; + kgid_t gid; + umode_t mode; + bool inline_data; +}; + /* * Build and send a cap message to the given MDS. * * Caller should be holding s_mutex. */ -static int send_cap_msg(struct ceph_mds_session *session, - u64 ino, u64 cid, int op, - int caps, int wanted, int dirty, - u32 seq, u64 flush_tid, u64 oldest_flush_tid, - u32 issue_seq, u32 mseq, u64 size, u64 max_size, - struct timespec *mtime, struct timespec *atime, - struct timespec *ctime, u32 time_warp_seq, - kuid_t uid, kgid_t gid, umode_t mode, - u64 xattr_version, - struct ceph_buffer *xattrs_buf, - u64 follows, bool inline_data) +static int send_cap_msg(struct cap_msg_args *arg) { struct ceph_mds_caps *fc; struct ceph_msg *msg; void *p; size_t extra_len; + struct timespec zerotime = {0}; dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s" " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu" - " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(op), - cid, ino, ceph_cap_string(caps), ceph_cap_string(wanted), - ceph_cap_string(dirty), - seq, issue_seq, flush_tid, oldest_flush_tid, - mseq, follows, size, max_size, - xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0); + " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(arg->op), + arg->cid, arg->ino, ceph_cap_string(arg->caps), + ceph_cap_string(arg->wanted), ceph_cap_string(arg->dirty), + arg->seq, arg->issue_seq, arg->flush_tid, arg->oldest_flush_tid, + arg->mseq, arg->follows, arg->size, arg->max_size, + arg->xattr_version, + arg->xattr_buf ? (int)arg->xattr_buf->vec.iov_len : 0); /* flock buffer size + inline version + inline data size + * osd_epoch_barrier + oldest_flush_tid */ - extra_len = 4 + 8 + 4 + 4 + 8; + extra_len = 4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4; msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len, GFP_NOFS, false); if (!msg) return -ENOMEM; - msg->hdr.version = cpu_to_le16(6); - msg->hdr.tid = cpu_to_le64(flush_tid); + msg->hdr.version = cpu_to_le16(10); + msg->hdr.tid = cpu_to_le64(arg->flush_tid); fc = msg->front.iov_base; memset(fc, 0, sizeof(*fc)); - fc->cap_id = cpu_to_le64(cid); - fc->op = cpu_to_le32(op); - fc->seq = cpu_to_le32(seq); - fc->issue_seq = cpu_to_le32(issue_seq); - fc->migrate_seq = cpu_to_le32(mseq); - fc->caps = cpu_to_le32(caps); - fc->wanted = cpu_to_le32(wanted); - fc->dirty = cpu_to_le32(dirty); - fc->ino = cpu_to_le64(ino); - fc->snap_follows = cpu_to_le64(follows); - - fc->size = cpu_to_le64(size); - fc->max_size = cpu_to_le64(max_size); - if (mtime) - ceph_encode_timespec(&fc->mtime, mtime); - if (atime) - ceph_encode_timespec(&fc->atime, atime); - if (ctime) - ceph_encode_timespec(&fc->ctime, ctime); - fc->time_warp_seq = cpu_to_le32(time_warp_seq); - - fc->uid = cpu_to_le32(from_kuid(&init_user_ns, uid)); - fc->gid = cpu_to_le32(from_kgid(&init_user_ns, gid)); - fc->mode = cpu_to_le32(mode); + fc->cap_id = cpu_to_le64(arg->cid); + fc->op = cpu_to_le32(arg->op); + fc->seq = cpu_to_le32(arg->seq); + fc->issue_seq = cpu_to_le32(arg->issue_seq); + fc->migrate_seq = cpu_to_le32(arg->mseq); + fc->caps = cpu_to_le32(arg->caps); + fc->wanted = cpu_to_le32(arg->wanted); + fc->dirty = cpu_to_le32(arg->dirty); + fc->ino = cpu_to_le64(arg->ino); + fc->snap_follows = cpu_to_le64(arg->follows); + + fc->size = cpu_to_le64(arg->size); + fc->max_size = cpu_to_le64(arg->max_size); + ceph_encode_timespec(&fc->mtime, &arg->mtime); + ceph_encode_timespec(&fc->atime, &arg->atime); + ceph_encode_timespec(&fc->ctime, &arg->ctime); + fc->time_warp_seq = cpu_to_le32(arg->time_warp_seq); + + fc->uid = cpu_to_le32(from_kuid(&init_user_ns, arg->uid)); + fc->gid = cpu_to_le32(from_kgid(&init_user_ns, arg->gid)); + fc->mode = cpu_to_le32(arg->mode); + + fc->xattr_version = cpu_to_le64(arg->xattr_version); + if (arg->xattr_buf) { + msg->middle = ceph_buffer_get(arg->xattr_buf); + fc->xattr_len = cpu_to_le32(arg->xattr_buf->vec.iov_len); + msg->hdr.middle_len = cpu_to_le32(arg->xattr_buf->vec.iov_len); + } p = fc + 1; - /* flock buffer size */ + /* flock buffer size (version 2) */ ceph_encode_32(&p, 0); - /* inline version */ - ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE); + /* inline version (version 4) */ + ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE); /* inline data size */ ceph_encode_32(&p, 0); - /* osd_epoch_barrier */ + /* osd_epoch_barrier (version 5) */ ceph_encode_32(&p, 0); - /* oldest_flush_tid */ - ceph_encode_64(&p, oldest_flush_tid); + /* oldest_flush_tid (version 6) */ + ceph_encode_64(&p, arg->oldest_flush_tid); - fc->xattr_version = cpu_to_le64(xattr_version); - if (xattrs_buf) { - msg->middle = ceph_buffer_get(xattrs_buf); - fc->xattr_len = cpu_to_le32(xattrs_buf->vec.iov_len); - msg->hdr.middle_len = cpu_to_le32(xattrs_buf->vec.iov_len); - } + /* + * caller_uid/caller_gid (version 7) + * + * Currently, we don't properly track which caller dirtied the caps + * last, and force a flush of them when there is a conflict. For now, + * just set this to 0:0, to emulate how the MDS has worked up to now. + */ + ceph_encode_32(&p, 0); + ceph_encode_32(&p, 0); + + /* pool namespace (version 8) (mds always ignores this) */ + ceph_encode_32(&p, 0); - ceph_con_send(&session->s_con, msg); + /* + * btime and change_attr (version 9) + * + * We just zero these out for now, as the MDS ignores them unless + * the requisite feature flags are set (which we don't do yet). + */ + ceph_encode_timespec(p, &zerotime); + p += sizeof(struct ceph_timespec); + ceph_encode_64(&p, 0); + + /* Advisory flags (version 10) */ + ceph_encode_32(&p, arg->flags); + + ceph_con_send(&arg->session->s_con, msg); return 0; } @@ -1115,27 +1146,17 @@ void ceph_queue_caps_release(struct inode *inode) * caller should hold snap_rwsem (read), s_mutex. */ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, - int op, int used, int want, int retain, int flushing, - u64 flush_tid, u64 oldest_flush_tid) + int op, bool sync, int used, int want, int retain, + int flushing, u64 flush_tid, u64 oldest_flush_tid) __releases(cap->ci->i_ceph_lock) { struct ceph_inode_info *ci = cap->ci; struct inode *inode = &ci->vfs_inode; - u64 cap_id = cap->cap_id; - int held, revoking, dropping, keep; - u64 follows, size, max_size; - u32 seq, issue_seq, mseq, time_warp_seq; - struct timespec mtime, atime, ctime; + struct cap_msg_args arg; + int held, revoking, dropping; int wake = 0; - umode_t mode; - kuid_t uid; - kgid_t gid; - struct ceph_mds_session *session; - u64 xattr_version = 0; - struct ceph_buffer *xattr_blob = NULL; int delayed = 0; int ret; - bool inline_data; held = cap->issued | cap->implemented; revoking = cap->implemented & ~cap->issued; @@ -1148,7 +1169,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, ceph_cap_string(revoking)); BUG_ON((retain & CEPH_CAP_PIN) == 0); - session = cap->session; + arg.session = cap->session; /* don't release wanted unless we've waited a bit. */ if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 && @@ -1177,40 +1198,51 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, cap->implemented &= cap->issued | used; cap->mds_wanted = want; - follows = flushing ? ci->i_head_snapc->seq : 0; - - keep = cap->implemented; - seq = cap->seq; - issue_seq = cap->issue_seq; - mseq = cap->mseq; - size = inode->i_size; - ci->i_reported_size = size; - max_size = ci->i_wanted_max_size; - ci->i_requested_max_size = max_size; - mtime = inode->i_mtime; - atime = inode->i_atime; - ctime = inode->i_ctime; - time_warp_seq = ci->i_time_warp_seq; - uid = inode->i_uid; - gid = inode->i_gid; - mode = inode->i_mode; + arg.ino = ceph_vino(inode).ino; + arg.cid = cap->cap_id; + arg.follows = flushing ? ci->i_head_snapc->seq : 0; + arg.flush_tid = flush_tid; + arg.oldest_flush_tid = oldest_flush_tid; + + arg.size = inode->i_size; + ci->i_reported_size = arg.size; + arg.max_size = ci->i_wanted_max_size; + ci->i_requested_max_size = arg.max_size; if (flushing & CEPH_CAP_XATTR_EXCL) { __ceph_build_xattrs_blob(ci); - xattr_blob = ci->i_xattrs.blob; - xattr_version = ci->i_xattrs.version; + arg.xattr_version = ci->i_xattrs.version; + arg.xattr_buf = ci->i_xattrs.blob; + } else { + arg.xattr_buf = NULL; } - inline_data = ci->i_inline_version != CEPH_INLINE_NONE; + arg.mtime = inode->i_mtime; + arg.atime = inode->i_atime; + arg.ctime = inode->i_ctime; + + arg.op = op; + arg.caps = cap->implemented; + arg.wanted = want; + arg.dirty = flushing; + + arg.seq = cap->seq; + arg.issue_seq = cap->issue_seq; + arg.mseq = cap->mseq; + arg.time_warp_seq = ci->i_time_warp_seq; + + arg.uid = inode->i_uid; + arg.gid = inode->i_gid; + arg.mode = inode->i_mode; + + arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE; + arg.flags = 0; + if (sync) + arg.flags |= CEPH_CLIENT_CAPS_SYNC; spin_unlock(&ci->i_ceph_lock); - ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id, - op, keep, want, flushing, seq, - flush_tid, oldest_flush_tid, issue_seq, mseq, - size, max_size, &mtime, &atime, &ctime, time_warp_seq, - uid, gid, mode, xattr_version, xattr_blob, - follows, inline_data); + ret = send_cap_msg(&arg); if (ret < 0) { dout("error sending cap msg, must requeue %p\n", inode); delayed = 1; @@ -1227,15 +1259,42 @@ static inline int __send_flush_snap(struct inode *inode, struct ceph_cap_snap *capsnap, u32 mseq, u64 oldest_flush_tid) { - return send_cap_msg(session, ceph_vino(inode).ino, 0, - CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0, - capsnap->dirty, 0, capsnap->cap_flush.tid, - oldest_flush_tid, 0, mseq, capsnap->size, 0, - &capsnap->mtime, &capsnap->atime, - &capsnap->ctime, capsnap->time_warp_seq, - capsnap->uid, capsnap->gid, capsnap->mode, - capsnap->xattr_version, capsnap->xattr_blob, - capsnap->follows, capsnap->inline_data); + struct cap_msg_args arg; + + arg.session = session; + arg.ino = ceph_vino(inode).ino; + arg.cid = 0; + arg.follows = capsnap->follows; + arg.flush_tid = capsnap->cap_flush.tid; + arg.oldest_flush_tid = oldest_flush_tid; + + arg.size = capsnap->size; + arg.max_size = 0; + arg.xattr_version = capsnap->xattr_version; + arg.xattr_buf = capsnap->xattr_blob; + + arg.atime = capsnap->atime; + arg.mtime = capsnap->mtime; + arg.ctime = capsnap->ctime; + + arg.op = CEPH_CAP_OP_FLUSHSNAP; + arg.caps = capsnap->issued; + arg.wanted = 0; + arg.dirty = capsnap->dirty; + + arg.seq = 0; + arg.issue_seq = 0; + arg.mseq = mseq; + arg.time_warp_seq = capsnap->time_warp_seq; + + arg.uid = capsnap->uid; + arg.gid = capsnap->gid; + arg.mode = capsnap->mode; + + arg.inline_data = capsnap->inline_data; + arg.flags = 0; + + return send_cap_msg(&arg); } /* @@ -1858,9 +1917,9 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, sent++; /* __send_cap drops i_ceph_lock */ - delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used, - want, retain, flushing, - flush_tid, oldest_flush_tid); + delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, false, + cap_used, want, retain, flushing, + flush_tid, oldest_flush_tid); goto retry; /* retake i_ceph_lock and restart our cap scan. */ } @@ -1924,9 +1983,9 @@ static int try_flush_caps(struct inode *inode, u64 *ptid) &flush_tid, &oldest_flush_tid); /* __send_cap drops i_ceph_lock */ - delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want, - (cap->issued | cap->implemented), - flushing, flush_tid, oldest_flush_tid); + delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, true, + used, want, (cap->issued | cap->implemented), + flushing, flush_tid, oldest_flush_tid); if (delayed) { spin_lock(&ci->i_ceph_lock); @@ -1996,7 +2055,7 @@ static int unsafe_request_wait(struct inode *inode) } spin_unlock(&ci->i_unsafe_lock); - dout("unsafe_requeset_wait %p wait on tid %llu %llu\n", + dout("unsafe_request_wait %p wait on tid %llu %llu\n", inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL); if (req1) { ret = !wait_for_completion_timeout(&req1->r_safe_completion, @@ -2119,7 +2178,7 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc, inode, cap, cf->tid, ceph_cap_string(cf->caps)); ci->i_ceph_flags |= CEPH_I_NODELAY; ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, - __ceph_caps_used(ci), + false, __ceph_caps_used(ci), __ceph_caps_wanted(ci), cap->issued | cap->implemented, cf->caps, cf->tid, oldest_flush_tid); @@ -2479,6 +2538,27 @@ static void check_max_size(struct inode *inode, loff_t endoff) ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); } +int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got) +{ + int ret, err = 0; + + BUG_ON(need & ~CEPH_CAP_FILE_RD); + BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)); + ret = ceph_pool_perm_check(ci, need); + if (ret < 0) + return ret; + + ret = try_get_cap_refs(ci, need, want, 0, true, got, &err); + if (ret) { + if (err == -EAGAIN) { + ret = 0; + } else if (err < 0) { + ret = err; + } + } + return ret; +} + /* * Wait for caps, and take cap references. If we can't get a WR cap * due to a small max_size, make sure we check_max_size (and possibly @@ -2507,9 +2587,15 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, if (err < 0) ret = err; } else { - ret = wait_event_interruptible(ci->i_cap_wq, - try_get_cap_refs(ci, need, want, endoff, - true, &_got, &err)); + DEFINE_WAIT_FUNC(wait, woken_wake_function); + add_wait_queue(&ci->i_cap_wq, &wait); + + while (!try_get_cap_refs(ci, need, want, endoff, + true, &_got, &err)) + wait_woken(&wait, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT); + + remove_wait_queue(&ci->i_cap_wq, &wait); + if (err == -EAGAIN) continue; if (err < 0) @@ -3570,6 +3656,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, cap->cap_id = le64_to_cpu(h->cap_id); cap->mseq = mseq; cap->seq = seq; + cap->issue_seq = seq; spin_lock(&session->s_cap_lock); list_add_tail(&cap->session_caps, &session->s_cap_releases); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 159fc8f1a6a00dbc5b291ed6e17d583dd90a269e..045d30d2662485a4207945757659383fb314fa10 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -454,71 +454,60 @@ enum { * only return a short read to the caller if we hit EOF. */ static int striped_read(struct inode *inode, - u64 off, u64 len, + u64 pos, u64 len, struct page **pages, int num_pages, - int *checkeof) + int page_align, int *checkeof) { struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_inode_info *ci = ceph_inode(inode); - u64 pos, this_len, left; + u64 this_len; loff_t i_size; - int page_align, pages_left; - int read, ret; - struct page **page_pos; + int page_idx; + int ret, read = 0; bool hit_stripe, was_short; /* * we may need to do multiple reads. not atomic, unfortunately. */ - pos = off; - left = len; - page_pos = pages; - pages_left = num_pages; - read = 0; - more: - page_align = pos & ~PAGE_MASK; - this_len = left; + this_len = len; + page_idx = (page_align + read) >> PAGE_SHIFT; ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode), &ci->i_layout, pos, &this_len, - ci->i_truncate_seq, - ci->i_truncate_size, - page_pos, pages_left, page_align); + ci->i_truncate_seq, ci->i_truncate_size, + pages + page_idx, num_pages - page_idx, + ((page_align + read) & ~PAGE_MASK)); if (ret == -ENOENT) ret = 0; - hit_stripe = this_len < left; + hit_stripe = this_len < len; was_short = ret >= 0 && ret < this_len; - dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read, + dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read, ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : ""); i_size = i_size_read(inode); if (ret >= 0) { - int didpages; if (was_short && (pos + ret < i_size)) { int zlen = min(this_len - ret, i_size - pos - ret); - int zoff = (off & ~PAGE_MASK) + read + ret; + int zoff = page_align + read + ret; dout(" zero gap %llu to %llu\n", - pos + ret, pos + ret + zlen); + pos + ret, pos + ret + zlen); ceph_zero_page_vector_range(zoff, zlen, pages); ret += zlen; } - didpages = (page_align + ret) >> PAGE_SHIFT; + read += ret; pos += ret; - read = pos - off; - left -= ret; - page_pos += didpages; - pages_left -= didpages; + len -= ret; /* hit stripe and need continue*/ - if (left && hit_stripe && pos < i_size) + if (len && hit_stripe && pos < i_size) goto more; } if (read > 0) { ret = read; /* did we bounce off eof? */ - if (pos + left > i_size) + if (pos + len > i_size) *checkeof = CHECK_EOF; } @@ -532,15 +521,16 @@ static int striped_read(struct inode *inode, * * If the read spans object boundary, just do multiple reads. */ -static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, - int *checkeof) +static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, + int *checkeof) { struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); struct page **pages; u64 off = iocb->ki_pos; - int num_pages, ret; - size_t len = iov_iter_count(i); + int num_pages; + ssize_t ret; + size_t len = iov_iter_count(to); dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len, @@ -559,35 +549,56 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, if (ret < 0) return ret; - num_pages = calc_pages_for(off, len); - pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); - if (IS_ERR(pages)) - return PTR_ERR(pages); - ret = striped_read(inode, off, len, pages, - num_pages, checkeof); - if (ret > 0) { - int l, k = 0; - size_t left = ret; - - while (left) { - size_t page_off = off & ~PAGE_MASK; - size_t copy = min_t(size_t, left, - PAGE_SIZE - page_off); - l = copy_page_to_iter(pages[k++], page_off, copy, i); - off += l; - left -= l; - if (l < copy) - break; + if (unlikely(to->type & ITER_PIPE)) { + size_t page_off; + ret = iov_iter_get_pages_alloc(to, &pages, len, + &page_off); + if (ret <= 0) + return -ENOMEM; + num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE); + + ret = striped_read(inode, off, ret, pages, num_pages, + page_off, checkeof); + if (ret > 0) { + iov_iter_advance(to, ret); + off += ret; + } else { + iov_iter_advance(to, 0); + } + ceph_put_page_vector(pages, num_pages, false); + } else { + num_pages = calc_pages_for(off, len); + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + ret = striped_read(inode, off, len, pages, num_pages, + (off & ~PAGE_MASK), checkeof); + if (ret > 0) { + int l, k = 0; + size_t left = ret; + + while (left) { + size_t page_off = off & ~PAGE_MASK; + size_t copy = min_t(size_t, left, + PAGE_SIZE - page_off); + l = copy_page_to_iter(pages[k++], page_off, + copy, to); + off += l; + left -= l; + if (l < copy) + break; + } } + ceph_release_page_vector(pages, num_pages); } - ceph_release_page_vector(pages, num_pages); if (off > iocb->ki_pos) { ret = off - iocb->ki_pos; iocb->ki_pos = off; } - dout("sync_read result %d\n", ret); + dout("sync_read result %zd\n", ret); return ret; } @@ -849,7 +860,7 @@ void ceph_sync_write_wait(struct inode *inode) dout("sync_write_wait on tid %llu (until %llu)\n", req->r_tid, last_tid); - wait_for_completion(&req->r_safe_completion); + wait_for_completion(&req->r_done_completion); ceph_osdc_put_request(req); spin_lock(&ci->i_unsafe_lock); @@ -902,7 +913,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, pos >> PAGE_SHIFT, (pos + count) >> PAGE_SHIFT); if (ret2 < 0) - dout("invalidate_inode_pages2_range returned %d\n", ret); + dout("invalidate_inode_pages2_range returned %d\n", ret2); flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_ONDISK | @@ -1245,8 +1256,9 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n", inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, ceph_cap_string(got)); - + current->journal_info = filp; ret = generic_file_read_iter(iocb, to); + current->journal_info = NULL; } dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); @@ -1766,6 +1778,7 @@ const struct file_operations ceph_file_fops = { .fsync = ceph_fsync, .lock = ceph_lock, .flock = ceph_flock, + .splice_read = generic_file_splice_read, .splice_write = iter_file_splice_write, .unlocked_ioctl = ceph_ioctl, .compat_ioctl = ceph_ioctl, diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 815acd1a56d461b944c06d117457ccd99c42e2f0..4f49253387a0a59d238a041b1ec8fc9fd7185c40 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -2100,17 +2100,26 @@ static int __do_request(struct ceph_mds_client *mdsc, err = -EIO; goto finish; } + if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { + if (mdsc->mdsmap_err) { + err = mdsc->mdsmap_err; + dout("do_request mdsmap err %d\n", err); + goto finish; + } + if (!(mdsc->fsc->mount_options->flags & + CEPH_MOUNT_OPT_MOUNTWAIT) && + !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { + err = -ENOENT; + pr_info("probably no mds server is up\n"); + goto finish; + } + } put_request_session(req); mds = __choose_mds(mdsc, req); if (mds < 0 || ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { - if (mdsc->mdsmap_err) { - err = mdsc->mdsmap_err; - dout("do_request mdsmap err %d\n", err); - goto finish; - } dout("do_request no mds or not active, waiting for map\n"); list_add(&req->r_wait, &mdsc->waiting_for_map); goto out; @@ -3943,13 +3952,13 @@ static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, } -static int verify_authorizer_reply(struct ceph_connection *con, int len) +static int verify_authorizer_reply(struct ceph_connection *con) { struct ceph_mds_session *s = con->private; struct ceph_mds_client *mdsc = s->s_mdsc; struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; - return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer, len); + return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer); } static int invalidate_authorizer(struct ceph_connection *con) diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c index 8c3591a7fbaeef67244f9ad678ebeaf005a64734..5454e2327a5f77874d63344802709697ceb64de1 100644 --- a/fs/ceph/mdsmap.c +++ b/fs/ceph/mdsmap.c @@ -42,6 +42,60 @@ int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m) return i; } +#define __decode_and_drop_type(p, end, type, bad) \ + do { \ + if (*p + sizeof(type) > end) \ + goto bad; \ + *p += sizeof(type); \ + } while (0) + +#define __decode_and_drop_set(p, end, type, bad) \ + do { \ + u32 n; \ + size_t need; \ + ceph_decode_32_safe(p, end, n, bad); \ + need = sizeof(type) * n; \ + ceph_decode_need(p, end, need, bad); \ + *p += need; \ + } while (0) + +#define __decode_and_drop_map(p, end, ktype, vtype, bad) \ + do { \ + u32 n; \ + size_t need; \ + ceph_decode_32_safe(p, end, n, bad); \ + need = (sizeof(ktype) + sizeof(vtype)) * n; \ + ceph_decode_need(p, end, need, bad); \ + *p += need; \ + } while (0) + + +static int __decode_and_drop_compat_set(void **p, void* end) +{ + int i; + /* compat, ro_compat, incompat*/ + for (i = 0; i < 3; i++) { + u32 n; + ceph_decode_need(p, end, sizeof(u64) + sizeof(u32), bad); + /* mask */ + *p += sizeof(u64); + /* names (map) */ + n = ceph_decode_32(p); + while (n-- > 0) { + u32 len; + ceph_decode_need(p, end, sizeof(u64) + sizeof(u32), + bad); + *p += sizeof(u64); + len = ceph_decode_32(p); + ceph_decode_need(p, end, len, bad); + *p += len; + } + } + return 0; +bad: + return -1; +} + /* * Decode an MDS map * @@ -55,6 +109,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end) int i, j, n; int err = -EINVAL; u8 mdsmap_v, mdsmap_cv; + u16 mdsmap_ev; m = kzalloc(sizeof(*m), GFP_NOFS); if (m == NULL) @@ -83,7 +138,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end) m->m_info = kcalloc(m->m_max_mds, sizeof(*m->m_info), GFP_NOFS); if (m->m_info == NULL) - goto badmem; + goto nomem; /* pick out active nodes from mds_info (state > 0) */ n = ceph_decode_32(p); @@ -166,7 +221,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end) info->export_targets = kcalloc(num_export_targets, sizeof(u32), GFP_NOFS); if (info->export_targets == NULL) - goto badmem; + goto nomem; for (j = 0; j < num_export_targets; j++) info->export_targets[j] = ceph_decode_32(&pexport_targets); @@ -180,24 +235,104 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end) m->m_num_data_pg_pools = n; m->m_data_pg_pools = kcalloc(n, sizeof(u64), GFP_NOFS); if (!m->m_data_pg_pools) - goto badmem; + goto nomem; ceph_decode_need(p, end, sizeof(u64)*(n+1), bad); for (i = 0; i < n; i++) m->m_data_pg_pools[i] = ceph_decode_64(p); m->m_cas_pg_pool = ceph_decode_64(p); + m->m_enabled = m->m_epoch > 1; + + mdsmap_ev = 1; + if (mdsmap_v >= 2) { + ceph_decode_16_safe(p, end, mdsmap_ev, bad_ext); + } + if (mdsmap_ev >= 3) { + if (__decode_and_drop_compat_set(p, end) < 0) + goto bad_ext; + } + /* metadata_pool */ + if (mdsmap_ev < 5) { + __decode_and_drop_type(p, end, u32, bad_ext); + } else { + __decode_and_drop_type(p, end, u64, bad_ext); + } - /* ok, we don't care about the rest. */ + /* created + modified + tableserver */ + __decode_and_drop_type(p, end, struct ceph_timespec, bad_ext); + __decode_and_drop_type(p, end, struct ceph_timespec, bad_ext); + __decode_and_drop_type(p, end, u32, bad_ext); + + /* in */ + { + int num_laggy = 0; + ceph_decode_32_safe(p, end, n, bad_ext); + ceph_decode_need(p, end, sizeof(u32) * n, bad_ext); + + for (i = 0; i < n; i++) { + s32 mds = ceph_decode_32(p); + if (mds >= 0 && mds < m->m_max_mds) { + if (m->m_info[mds].laggy) + num_laggy++; + } + } + m->m_num_laggy = num_laggy; + } + + /* inc */ + __decode_and_drop_map(p, end, u32, u32, bad_ext); + /* up */ + __decode_and_drop_map(p, end, u32, u64, bad_ext); + /* failed */ + __decode_and_drop_set(p, end, u32, bad_ext); + /* stopped */ + __decode_and_drop_set(p, end, u32, bad_ext); + + if (mdsmap_ev >= 4) { + /* last_failure_osd_epoch */ + __decode_and_drop_type(p, end, u32, bad_ext); + } + if (mdsmap_ev >= 6) { + /* ever_allowed_snaps */ + __decode_and_drop_type(p, end, u8, bad_ext); + /* explicitly_allowed_snaps */ + __decode_and_drop_type(p, end, u8, bad_ext); + } + if (mdsmap_ev >= 7) { + /* inline_data_enabled */ + __decode_and_drop_type(p, end, u8, bad_ext); + } + if (mdsmap_ev >= 8) { + u32 name_len; + /* enabled */ + ceph_decode_8_safe(p, end, m->m_enabled, bad_ext); + ceph_decode_32_safe(p, end, name_len, bad_ext); + ceph_decode_need(p, end, name_len, bad_ext); + *p += name_len; + } + /* damaged */ + if (mdsmap_ev >= 9) { + size_t need; + ceph_decode_32_safe(p, end, n, bad_ext); + need = sizeof(u32) * n; + ceph_decode_need(p, end, need, bad_ext); + *p += need; + m->m_damaged = n > 0; + } else { + m->m_damaged = false; + } +bad_ext: *p = end; dout("mdsmap_decode success epoch %u\n", m->m_epoch); return m; - -badmem: +nomem: err = -ENOMEM; + goto out_err; bad: pr_err("corrupt mdsmap\n"); print_hex_dump(KERN_DEBUG, "mdsmap: ", DUMP_PREFIX_OFFSET, 16, 1, start, end - start, true); +out_err: ceph_mdsmap_destroy(m); return ERR_PTR(err); } @@ -212,3 +347,19 @@ void ceph_mdsmap_destroy(struct ceph_mdsmap *m) kfree(m->m_data_pg_pools); kfree(m); } + +bool ceph_mdsmap_is_cluster_available(struct ceph_mdsmap *m) +{ + int i, nr_active = 0; + if (!m->m_enabled) + return false; + if (m->m_damaged) + return false; + if (m->m_num_laggy > 0) + return false; + for (i = 0; i < m->m_max_mds; i++) { + if (m->m_info[i].state == CEPH_MDS_STATE_ACTIVE) + nr_active++; + } + return nr_active > 0; +} diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index 9ff5219d849e942c8f3a6a480d57daa52efeb8e0..8f8b41c2ef0f7d472afad0e1abcb4801ac11b221 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -593,6 +593,8 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci, capsnap->atime = inode->i_atime; capsnap->ctime = inode->i_ctime; capsnap->time_warp_seq = ci->i_time_warp_seq; + capsnap->truncate_size = ci->i_truncate_size; + capsnap->truncate_seq = ci->i_truncate_seq; if (capsnap->dirty_pages) { dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu " "still has %d dirty pages\n", inode, capsnap, diff --git a/fs/ceph/super.c b/fs/ceph/super.c index f2f76696ddca2065a5a4837a45fb1b8a0c25cf10..6bd20d707bfd885aff2f89a4b7266cc1c05fd5c8 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -137,6 +137,8 @@ enum { Opt_nofscache, Opt_poolperm, Opt_nopoolperm, + Opt_require_active_mds, + Opt_norequire_active_mds, #ifdef CONFIG_CEPH_FS_POSIX_ACL Opt_acl, #endif @@ -171,6 +173,8 @@ static match_table_t fsopt_tokens = { {Opt_nofscache, "nofsc"}, {Opt_poolperm, "poolperm"}, {Opt_nopoolperm, "nopoolperm"}, + {Opt_require_active_mds, "require_active_mds"}, + {Opt_norequire_active_mds, "norequire_active_mds"}, #ifdef CONFIG_CEPH_FS_POSIX_ACL {Opt_acl, "acl"}, #endif @@ -287,6 +291,12 @@ static int parse_fsopt_token(char *c, void *private) case Opt_nopoolperm: fsopt->flags |= CEPH_MOUNT_OPT_NOPOOLPERM; break; + case Opt_require_active_mds: + fsopt->flags &= ~CEPH_MOUNT_OPT_MOUNTWAIT; + break; + case Opt_norequire_active_mds: + fsopt->flags |= CEPH_MOUNT_OPT_MOUNTWAIT; + break; #ifdef CONFIG_CEPH_FS_POSIX_ACL case Opt_acl: fsopt->sb_flags |= MS_POSIXACL; diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 931687f71a7cfab9e0e1618666e74821ffd704ac..3373b61faefd0fac7d240438e5bb2dca7e3433db 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -36,6 +36,7 @@ #define CEPH_MOUNT_OPT_DCACHE (1<<9) /* use dcache for readdir etc */ #define CEPH_MOUNT_OPT_FSCACHE (1<<10) /* use fscache */ #define CEPH_MOUNT_OPT_NOPOOLPERM (1<<11) /* no pool permission check */ +#define CEPH_MOUNT_OPT_MOUNTWAIT (1<<12) /* mount waits if no mds is up */ #define CEPH_MOUNT_OPT_DEFAULT CEPH_MOUNT_OPT_DCACHE @@ -180,6 +181,8 @@ struct ceph_cap_snap { u64 size; struct timespec mtime, atime, ctime; u64 time_warp_seq; + u64 truncate_size; + u32 truncate_seq; int writing; /* a sync write is still in progress */ int dirty_pages; /* dirty pages awaiting writeback */ bool inline_data; @@ -905,6 +908,8 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn, extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, loff_t endoff, int *got, struct page **pinned_page); +extern int ceph_try_get_caps(struct ceph_inode_info *ci, + int need, int want, int *got); /* for counting open files by mode */ extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode); diff --git a/include/linux/ceph/auth.h b/include/linux/ceph/auth.h index 374bb1c4ef52925a3ce3b55b829587503a0088fa..a6747789fe5c25679d99fcc42b74042ea29251a3 100644 --- a/include/linux/ceph/auth.h +++ b/include/linux/ceph/auth.h @@ -64,7 +64,7 @@ struct ceph_auth_client_ops { int (*update_authorizer)(struct ceph_auth_client *ac, int peer_type, struct ceph_auth_handshake *auth); int (*verify_authorizer_reply)(struct ceph_auth_client *ac, - struct ceph_authorizer *a, size_t len); + struct ceph_authorizer *a); void (*invalidate_authorizer)(struct ceph_auth_client *ac, int peer_type); @@ -118,8 +118,7 @@ extern int ceph_auth_update_authorizer(struct ceph_auth_client *ac, int peer_type, struct ceph_auth_handshake *a); extern int ceph_auth_verify_authorizer_reply(struct ceph_auth_client *ac, - struct ceph_authorizer *a, - size_t len); + struct ceph_authorizer *a); extern void ceph_auth_invalidate_authorizer(struct ceph_auth_client *ac, int peer_type); diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index f96de8de4fa7a187ae81cedd330c4f1728d006f1..f4b2ee18f38cbd51d2ded16380e15da7697d9f89 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -653,6 +653,9 @@ enum { extern const char *ceph_cap_op_name(int op); +/* flags field in client cap messages (version >= 10) */ +#define CEPH_CLIENT_CAPS_SYNC (0x1) + /* * caps message, used for capability callbacks, acks, requests, etc. */ diff --git a/include/linux/ceph/mdsmap.h b/include/linux/ceph/mdsmap.h index 87ed09f548007eb5ce6e8e50651dfc0484387091..8ed5dc505fbb2e0672efd81089d1e35caf8e062c 100644 --- a/include/linux/ceph/mdsmap.h +++ b/include/linux/ceph/mdsmap.h @@ -31,6 +31,10 @@ struct ceph_mdsmap { int m_num_data_pg_pools; u64 *m_data_pg_pools; u64 m_cas_pg_pool; + + bool m_enabled; + bool m_damaged; + int m_num_laggy; }; static inline struct ceph_entity_addr * @@ -59,5 +63,6 @@ static inline bool ceph_mdsmap_is_laggy(struct ceph_mdsmap *m, int w) extern int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m); extern struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end); extern void ceph_mdsmap_destroy(struct ceph_mdsmap *m); +extern bool ceph_mdsmap_is_cluster_available(struct ceph_mdsmap *m); #endif diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index 67bcef2ecddb823e1a9ffe75ab797e5da4473e68..c5c4c713e00f52640366014be16d32cb79bc2181 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -30,7 +30,7 @@ struct ceph_connection_operations { struct ceph_auth_handshake *(*get_authorizer) ( struct ceph_connection *con, int *proto, int force_new); - int (*verify_authorizer_reply) (struct ceph_connection *con, int len); + int (*verify_authorizer_reply) (struct ceph_connection *con); int (*invalidate_authorizer)(struct ceph_connection *con); /* there was some error on the socket (disconnect, whatever) */ diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index a8e66344bacc225642acc872c614b3017c314218..03a6653d329a01b90803d373f7a955230d787675 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -176,7 +176,7 @@ struct ceph_osd_request { struct kref r_kref; bool r_mempool; struct completion r_completion; - struct completion r_safe_completion; /* fsync waiter */ + struct completion r_done_completion; /* fsync waiter */ ceph_osdc_callback_t r_callback; ceph_osdc_unsafe_callback_t r_unsafe_callback; struct list_head r_unsafe_item; diff --git a/net/ceph/auth.c b/net/ceph/auth.c index c822b3ae1bd37b2ca9e4708b3d1ed6b7dd66f6c3..48bb8d95195b1df91d3f34d20d815d7021e10693 100644 --- a/net/ceph/auth.c +++ b/net/ceph/auth.c @@ -315,13 +315,13 @@ int ceph_auth_update_authorizer(struct ceph_auth_client *ac, EXPORT_SYMBOL(ceph_auth_update_authorizer); int ceph_auth_verify_authorizer_reply(struct ceph_auth_client *ac, - struct ceph_authorizer *a, size_t len) + struct ceph_authorizer *a) { int ret = 0; mutex_lock(&ac->mutex); if (ac->ops && ac->ops->verify_authorizer_reply) - ret = ac->ops->verify_authorizer_reply(ac, a, len); + ret = ac->ops->verify_authorizer_reply(ac, a); mutex_unlock(&ac->mutex); return ret; } diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c index a0905f04bd13f3250f51de5a7600f4089d3053a4..2034fb9266700e6b456b141db9b5d264aed77d62 100644 --- a/net/ceph/auth_x.c +++ b/net/ceph/auth_x.c @@ -39,56 +39,58 @@ static int ceph_x_should_authenticate(struct ceph_auth_client *ac) return need != 0; } +static int ceph_x_encrypt_offset(void) +{ + return sizeof(u32) + sizeof(struct ceph_x_encrypt_header); +} + static int ceph_x_encrypt_buflen(int ilen) { - return sizeof(struct ceph_x_encrypt_header) + ilen + 16 + - sizeof(u32); + return ceph_x_encrypt_offset() + ilen + 16; } -static int ceph_x_encrypt(struct ceph_crypto_key *secret, - void *ibuf, int ilen, void *obuf, size_t olen) +static int ceph_x_encrypt(struct ceph_crypto_key *secret, void *buf, + int buf_len, int plaintext_len) { - struct ceph_x_encrypt_header head = { - .struct_v = 1, - .magic = cpu_to_le64(CEPHX_ENC_MAGIC) - }; - size_t len = olen - sizeof(u32); + struct ceph_x_encrypt_header *hdr = buf + sizeof(u32); + int ciphertext_len; int ret; - ret = ceph_encrypt2(secret, obuf + sizeof(u32), &len, - &head, sizeof(head), ibuf, ilen); + hdr->struct_v = 1; + hdr->magic = cpu_to_le64(CEPHX_ENC_MAGIC); + + ret = ceph_crypt(secret, true, buf + sizeof(u32), buf_len - sizeof(u32), + plaintext_len + sizeof(struct ceph_x_encrypt_header), + &ciphertext_len); if (ret) return ret; - ceph_encode_32(&obuf, len); - return len + sizeof(u32); + + ceph_encode_32(&buf, ciphertext_len); + return sizeof(u32) + ciphertext_len; } -static int ceph_x_decrypt(struct ceph_crypto_key *secret, - void **p, void *end, void **obuf, size_t olen) +static int ceph_x_decrypt(struct ceph_crypto_key *secret, void **p, void *end) { - struct ceph_x_encrypt_header head; - size_t head_len = sizeof(head); - int len, ret; - - len = ceph_decode_32(p); - if (*p + len > end) - return -EINVAL; + struct ceph_x_encrypt_header *hdr = *p + sizeof(u32); + int ciphertext_len, plaintext_len; + int ret; - dout("ceph_x_decrypt len %d\n", len); - if (*obuf == NULL) { - *obuf = kmalloc(len, GFP_NOFS); - if (!*obuf) - return -ENOMEM; - olen = len; - } + ceph_decode_32_safe(p, end, ciphertext_len, e_inval); + ceph_decode_need(p, end, ciphertext_len, e_inval); - ret = ceph_decrypt2(secret, &head, &head_len, *obuf, &olen, *p, len); + ret = ceph_crypt(secret, false, *p, end - *p, ciphertext_len, + &plaintext_len); if (ret) return ret; - if (head.struct_v != 1 || le64_to_cpu(head.magic) != CEPHX_ENC_MAGIC) + + if (hdr->struct_v != 1 || le64_to_cpu(hdr->magic) != CEPHX_ENC_MAGIC) return -EPERM; - *p += len; - return olen; + + *p += ciphertext_len; + return plaintext_len - sizeof(struct ceph_x_encrypt_header); + +e_inval: + return -EINVAL; } /* @@ -143,13 +145,10 @@ static int process_one_ticket(struct ceph_auth_client *ac, int type; u8 tkt_struct_v, blob_struct_v; struct ceph_x_ticket_handler *th; - void *dbuf = NULL; void *dp, *dend; int dlen; char is_enc; struct timespec validity; - struct ceph_crypto_key old_key; - void *ticket_buf = NULL; void *tp, *tpend; void **ptp; struct ceph_crypto_key new_session_key; @@ -174,20 +173,17 @@ static int process_one_ticket(struct ceph_auth_client *ac, } /* blob for me */ - dlen = ceph_x_decrypt(secret, p, end, &dbuf, 0); - if (dlen <= 0) { - ret = dlen; + dp = *p + ceph_x_encrypt_offset(); + ret = ceph_x_decrypt(secret, p, end); + if (ret < 0) goto out; - } - dout(" decrypted %d bytes\n", dlen); - dp = dbuf; - dend = dp + dlen; + dout(" decrypted %d bytes\n", ret); + dend = dp + ret; tkt_struct_v = ceph_decode_8(&dp); if (tkt_struct_v != 1) goto bad; - memcpy(&old_key, &th->session_key, sizeof(old_key)); ret = ceph_crypto_key_decode(&new_session_key, &dp, dend); if (ret) goto out; @@ -203,15 +199,13 @@ static int process_one_ticket(struct ceph_auth_client *ac, ceph_decode_8_safe(p, end, is_enc, bad); if (is_enc) { /* encrypted */ - dout(" encrypted ticket\n"); - dlen = ceph_x_decrypt(&old_key, p, end, &ticket_buf, 0); - if (dlen < 0) { - ret = dlen; + tp = *p + ceph_x_encrypt_offset(); + ret = ceph_x_decrypt(&th->session_key, p, end); + if (ret < 0) goto out; - } - tp = ticket_buf; + dout(" encrypted ticket, decrypted %d bytes\n", ret); ptp = &tp; - tpend = *ptp + dlen; + tpend = tp + ret; } else { /* unencrypted */ ptp = p; @@ -242,8 +236,6 @@ static int process_one_ticket(struct ceph_auth_client *ac, xi->have_keys |= th->service; out: - kfree(ticket_buf); - kfree(dbuf); return ret; bad: @@ -294,7 +286,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac, { int maxlen; struct ceph_x_authorize_a *msg_a; - struct ceph_x_authorize_b msg_b; + struct ceph_x_authorize_b *msg_b; void *p, *end; int ret; int ticket_blob_len = @@ -308,8 +300,8 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac, if (ret) goto out_au; - maxlen = sizeof(*msg_a) + sizeof(msg_b) + - ceph_x_encrypt_buflen(ticket_blob_len); + maxlen = sizeof(*msg_a) + ticket_blob_len + + ceph_x_encrypt_buflen(sizeof(*msg_b)); dout(" need len %d\n", maxlen); if (au->buf && au->buf->alloc_len < maxlen) { ceph_buffer_put(au->buf); @@ -343,18 +335,19 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac, p += ticket_blob_len; end = au->buf->vec.iov_base + au->buf->vec.iov_len; + msg_b = p + ceph_x_encrypt_offset(); + msg_b->struct_v = 1; get_random_bytes(&au->nonce, sizeof(au->nonce)); - msg_b.struct_v = 1; - msg_b.nonce = cpu_to_le64(au->nonce); - ret = ceph_x_encrypt(&au->session_key, &msg_b, sizeof(msg_b), - p, end - p); + msg_b->nonce = cpu_to_le64(au->nonce); + ret = ceph_x_encrypt(&au->session_key, p, end - p, sizeof(*msg_b)); if (ret < 0) goto out_au; + p += ret; + WARN_ON(p > end); au->buf->vec.iov_len = p - au->buf->vec.iov_base; dout(" built authorizer nonce %llx len %d\n", au->nonce, (int)au->buf->vec.iov_len); - BUG_ON(au->buf->vec.iov_len > maxlen); return 0; out_au: @@ -452,8 +445,9 @@ static int ceph_x_build_request(struct ceph_auth_client *ac, if (need & CEPH_ENTITY_TYPE_AUTH) { struct ceph_x_authenticate *auth = (void *)(head + 1); void *p = auth + 1; - struct ceph_x_challenge_blob tmp; - char tmp_enc[40]; + void *enc_buf = xi->auth_authorizer.enc_buf; + struct ceph_x_challenge_blob *blob = enc_buf + + ceph_x_encrypt_offset(); u64 *u; if (p > end) @@ -464,16 +458,16 @@ static int ceph_x_build_request(struct ceph_auth_client *ac, /* encrypt and hash */ get_random_bytes(&auth->client_challenge, sizeof(u64)); - tmp.client_challenge = auth->client_challenge; - tmp.server_challenge = cpu_to_le64(xi->server_challenge); - ret = ceph_x_encrypt(&xi->secret, &tmp, sizeof(tmp), - tmp_enc, sizeof(tmp_enc)); + blob->client_challenge = auth->client_challenge; + blob->server_challenge = cpu_to_le64(xi->server_challenge); + ret = ceph_x_encrypt(&xi->secret, enc_buf, CEPHX_AU_ENC_BUF_LEN, + sizeof(*blob)); if (ret < 0) return ret; auth->struct_v = 1; auth->key = 0; - for (u = (u64 *)tmp_enc; u + 1 <= (u64 *)(tmp_enc + ret); u++) + for (u = (u64 *)enc_buf; u + 1 <= (u64 *)(enc_buf + ret); u++) auth->key ^= *(__le64 *)u; dout(" server_challenge %llx client_challenge %llx key %llx\n", xi->server_challenge, le64_to_cpu(auth->client_challenge), @@ -600,8 +594,8 @@ static int ceph_x_create_authorizer( auth->authorizer = (struct ceph_authorizer *) au; auth->authorizer_buf = au->buf->vec.iov_base; auth->authorizer_buf_len = au->buf->vec.iov_len; - auth->authorizer_reply_buf = au->reply_buf; - auth->authorizer_reply_buf_len = sizeof (au->reply_buf); + auth->authorizer_reply_buf = au->enc_buf; + auth->authorizer_reply_buf_len = CEPHX_AU_ENC_BUF_LEN; auth->sign_message = ac->ops->sign_message; auth->check_message_signature = ac->ops->check_message_signature; @@ -629,27 +623,25 @@ static int ceph_x_update_authorizer( } static int ceph_x_verify_authorizer_reply(struct ceph_auth_client *ac, - struct ceph_authorizer *a, size_t len) + struct ceph_authorizer *a) { struct ceph_x_authorizer *au = (void *)a; - int ret = 0; - struct ceph_x_authorize_reply reply; - void *preply = &reply; - void *p = au->reply_buf; - void *end = p + sizeof(au->reply_buf); + void *p = au->enc_buf; + struct ceph_x_authorize_reply *reply = p + ceph_x_encrypt_offset(); + int ret; - ret = ceph_x_decrypt(&au->session_key, &p, end, &preply, sizeof(reply)); + ret = ceph_x_decrypt(&au->session_key, &p, p + CEPHX_AU_ENC_BUF_LEN); if (ret < 0) return ret; - if (ret != sizeof(reply)) + if (ret != sizeof(*reply)) return -EPERM; - if (au->nonce + 1 != le64_to_cpu(reply.nonce_plus_one)) + if (au->nonce + 1 != le64_to_cpu(reply->nonce_plus_one)) ret = -EPERM; else ret = 0; dout("verify_authorizer_reply nonce %llx got %llx ret %d\n", - au->nonce, le64_to_cpu(reply.nonce_plus_one), ret); + au->nonce, le64_to_cpu(reply->nonce_plus_one), ret); return ret; } @@ -704,35 +696,48 @@ static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac, invalidate_ticket(ac, CEPH_ENTITY_TYPE_AUTH); } -static int calcu_signature(struct ceph_x_authorizer *au, - struct ceph_msg *msg, __le64 *sig) +static int calc_signature(struct ceph_x_authorizer *au, struct ceph_msg *msg, + __le64 *psig) { + void *enc_buf = au->enc_buf; + struct { + __le32 len; + __le32 header_crc; + __le32 front_crc; + __le32 middle_crc; + __le32 data_crc; + } __packed *sigblock = enc_buf + ceph_x_encrypt_offset(); int ret; - char tmp_enc[40]; - __le32 tmp[5] = { - cpu_to_le32(16), msg->hdr.crc, msg->footer.front_crc, - msg->footer.middle_crc, msg->footer.data_crc, - }; - ret = ceph_x_encrypt(&au->session_key, &tmp, sizeof(tmp), - tmp_enc, sizeof(tmp_enc)); + + sigblock->len = cpu_to_le32(4*sizeof(u32)); + sigblock->header_crc = msg->hdr.crc; + sigblock->front_crc = msg->footer.front_crc; + sigblock->middle_crc = msg->footer.middle_crc; + sigblock->data_crc = msg->footer.data_crc; + ret = ceph_x_encrypt(&au->session_key, enc_buf, CEPHX_AU_ENC_BUF_LEN, + sizeof(*sigblock)); if (ret < 0) return ret; - *sig = *(__le64*)(tmp_enc + 4); + + *psig = *(__le64 *)(enc_buf + sizeof(u32)); return 0; } static int ceph_x_sign_message(struct ceph_auth_handshake *auth, struct ceph_msg *msg) { + __le64 sig; int ret; if (ceph_test_opt(from_msgr(msg->con->msgr), NOMSGSIGN)) return 0; - ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer, - msg, &msg->footer.sig); - if (ret < 0) + ret = calc_signature((struct ceph_x_authorizer *)auth->authorizer, + msg, &sig); + if (ret) return ret; + + msg->footer.sig = sig; msg->footer.flags |= CEPH_MSG_FOOTER_SIGNED; return 0; } @@ -746,9 +751,9 @@ static int ceph_x_check_message_signature(struct ceph_auth_handshake *auth, if (ceph_test_opt(from_msgr(msg->con->msgr), NOMSGSIGN)) return 0; - ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer, - msg, &sig_check); - if (ret < 0) + ret = calc_signature((struct ceph_x_authorizer *)auth->authorizer, + msg, &sig_check); + if (ret) return ret; if (sig_check == msg->footer.sig) return 0; diff --git a/net/ceph/auth_x.h b/net/ceph/auth_x.h index 21a5af904bae751391bf7d508dd75b1d2ee426e2..48e9ad41bd2aa70cdf42922a830f052d39e5ea7d 100644 --- a/net/ceph/auth_x.h +++ b/net/ceph/auth_x.h @@ -24,6 +24,7 @@ struct ceph_x_ticket_handler { unsigned long renew_after, expires; }; +#define CEPHX_AU_ENC_BUF_LEN 128 /* big enough for encrypted blob */ struct ceph_x_authorizer { struct ceph_authorizer base; @@ -32,7 +33,7 @@ struct ceph_x_authorizer { unsigned int service; u64 nonce; u64 secret_id; - char reply_buf[128]; /* big enough for encrypted blob */ + char enc_buf[CEPHX_AU_ENC_BUF_LEN] __aligned(8); }; struct ceph_x_info { diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index a421e905331af359569a2613bdaeeb21268731a2..130ab407c5ecf8ca5c0943759efff91c7bf258e8 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -17,10 +17,12 @@ # include # include # include +# include #else # include "crush_compat.h" # include "crush.h" # include "hash.h" +# include "mapper.h" #endif #include "crush_ln_table.h" diff --git a/net/ceph/crypto.c b/net/ceph/crypto.c index db2847ac5f122988ce1625153fc219e74a3ddb8d..3949ce70be07bc90b1ee7e67ca95f09c1d5258f3 100644 --- a/net/ceph/crypto.c +++ b/net/ceph/crypto.c @@ -13,14 +13,60 @@ #include #include "crypto.h" +/* + * Set ->key and ->tfm. The rest of the key should be filled in before + * this function is called. + */ +static int set_secret(struct ceph_crypto_key *key, void *buf) +{ + unsigned int noio_flag; + int ret; + + key->key = NULL; + key->tfm = NULL; + + switch (key->type) { + case CEPH_CRYPTO_NONE: + return 0; /* nothing to do */ + case CEPH_CRYPTO_AES: + break; + default: + return -ENOTSUPP; + } + + WARN_ON(!key->len); + key->key = kmemdup(buf, key->len, GFP_NOIO); + if (!key->key) { + ret = -ENOMEM; + goto fail; + } + + /* crypto_alloc_skcipher() allocates with GFP_KERNEL */ + noio_flag = memalloc_noio_save(); + key->tfm = crypto_alloc_skcipher("cbc(aes)", 0, CRYPTO_ALG_ASYNC); + memalloc_noio_restore(noio_flag); + if (IS_ERR(key->tfm)) { + ret = PTR_ERR(key->tfm); + key->tfm = NULL; + goto fail; + } + + ret = crypto_skcipher_setkey(key->tfm, key->key, key->len); + if (ret) + goto fail; + + return 0; + +fail: + ceph_crypto_key_destroy(key); + return ret; +} + int ceph_crypto_key_clone(struct ceph_crypto_key *dst, const struct ceph_crypto_key *src) { memcpy(dst, src, sizeof(struct ceph_crypto_key)); - dst->key = kmemdup(src->key, src->len, GFP_NOFS); - if (!dst->key) - return -ENOMEM; - return 0; + return set_secret(dst, src->key); } int ceph_crypto_key_encode(struct ceph_crypto_key *key, void **p, void *end) @@ -37,16 +83,16 @@ int ceph_crypto_key_encode(struct ceph_crypto_key *key, void **p, void *end) int ceph_crypto_key_decode(struct ceph_crypto_key *key, void **p, void *end) { + int ret; + ceph_decode_need(p, end, 2*sizeof(u16) + sizeof(key->created), bad); key->type = ceph_decode_16(p); ceph_decode_copy(p, &key->created, sizeof(key->created)); key->len = ceph_decode_16(p); ceph_decode_need(p, end, key->len, bad); - key->key = kmalloc(key->len, GFP_NOFS); - if (!key->key) - return -ENOMEM; - ceph_decode_copy(p, key->key, key->len); - return 0; + ret = set_secret(key, *p); + *p += key->len; + return ret; bad: dout("failed to decode crypto key\n"); @@ -80,9 +126,14 @@ int ceph_crypto_key_unarmor(struct ceph_crypto_key *key, const char *inkey) return 0; } -static struct crypto_skcipher *ceph_crypto_alloc_cipher(void) +void ceph_crypto_key_destroy(struct ceph_crypto_key *key) { - return crypto_alloc_skcipher("cbc(aes)", 0, CRYPTO_ALG_ASYNC); + if (key) { + kfree(key->key); + key->key = NULL; + crypto_free_skcipher(key->tfm); + key->tfm = NULL; + } } static const u8 *aes_iv = (u8 *)CEPH_AES_IV; @@ -157,372 +208,82 @@ static void teardown_sgtable(struct sg_table *sgt) sg_free_table(sgt); } -static int ceph_aes_encrypt(const void *key, int key_len, - void *dst, size_t *dst_len, - const void *src, size_t src_len) +static int ceph_aes_crypt(const struct ceph_crypto_key *key, bool encrypt, + void *buf, int buf_len, int in_len, int *pout_len) { - struct scatterlist sg_in[2], prealloc_sg; - struct sg_table sg_out; - struct crypto_skcipher *tfm = ceph_crypto_alloc_cipher(); - SKCIPHER_REQUEST_ON_STACK(req, tfm); - int ret; + SKCIPHER_REQUEST_ON_STACK(req, key->tfm); + struct sg_table sgt; + struct scatterlist prealloc_sg; char iv[AES_BLOCK_SIZE]; - size_t zero_padding = (0x10 - (src_len & 0x0f)); - char pad[16]; - - if (IS_ERR(tfm)) - return PTR_ERR(tfm); - - memset(pad, zero_padding, zero_padding); - - *dst_len = src_len + zero_padding; - - sg_init_table(sg_in, 2); - sg_set_buf(&sg_in[0], src, src_len); - sg_set_buf(&sg_in[1], pad, zero_padding); - ret = setup_sgtable(&sg_out, &prealloc_sg, dst, *dst_len); - if (ret) - goto out_tfm; - - crypto_skcipher_setkey((void *)tfm, key, key_len); - memcpy(iv, aes_iv, AES_BLOCK_SIZE); - - skcipher_request_set_tfm(req, tfm); - skcipher_request_set_callback(req, 0, NULL, NULL); - skcipher_request_set_crypt(req, sg_in, sg_out.sgl, - src_len + zero_padding, iv); - - /* - print_hex_dump(KERN_ERR, "enc key: ", DUMP_PREFIX_NONE, 16, 1, - key, key_len, 1); - print_hex_dump(KERN_ERR, "enc src: ", DUMP_PREFIX_NONE, 16, 1, - src, src_len, 1); - print_hex_dump(KERN_ERR, "enc pad: ", DUMP_PREFIX_NONE, 16, 1, - pad, zero_padding, 1); - */ - ret = crypto_skcipher_encrypt(req); - skcipher_request_zero(req); - if (ret < 0) { - pr_err("ceph_aes_crypt failed %d\n", ret); - goto out_sg; - } - /* - print_hex_dump(KERN_ERR, "enc out: ", DUMP_PREFIX_NONE, 16, 1, - dst, *dst_len, 1); - */ - -out_sg: - teardown_sgtable(&sg_out); -out_tfm: - crypto_free_skcipher(tfm); - return ret; -} - -static int ceph_aes_encrypt2(const void *key, int key_len, void *dst, - size_t *dst_len, - const void *src1, size_t src1_len, - const void *src2, size_t src2_len) -{ - struct scatterlist sg_in[3], prealloc_sg; - struct sg_table sg_out; - struct crypto_skcipher *tfm = ceph_crypto_alloc_cipher(); - SKCIPHER_REQUEST_ON_STACK(req, tfm); + int pad_byte = AES_BLOCK_SIZE - (in_len & (AES_BLOCK_SIZE - 1)); + int crypt_len = encrypt ? in_len + pad_byte : in_len; int ret; - char iv[AES_BLOCK_SIZE]; - size_t zero_padding = (0x10 - ((src1_len + src2_len) & 0x0f)); - char pad[16]; - if (IS_ERR(tfm)) - return PTR_ERR(tfm); - - memset(pad, zero_padding, zero_padding); - - *dst_len = src1_len + src2_len + zero_padding; - - sg_init_table(sg_in, 3); - sg_set_buf(&sg_in[0], src1, src1_len); - sg_set_buf(&sg_in[1], src2, src2_len); - sg_set_buf(&sg_in[2], pad, zero_padding); - ret = setup_sgtable(&sg_out, &prealloc_sg, dst, *dst_len); + WARN_ON(crypt_len > buf_len); + if (encrypt) + memset(buf + in_len, pad_byte, pad_byte); + ret = setup_sgtable(&sgt, &prealloc_sg, buf, crypt_len); if (ret) - goto out_tfm; - - crypto_skcipher_setkey((void *)tfm, key, key_len); - memcpy(iv, aes_iv, AES_BLOCK_SIZE); - - skcipher_request_set_tfm(req, tfm); - skcipher_request_set_callback(req, 0, NULL, NULL); - skcipher_request_set_crypt(req, sg_in, sg_out.sgl, - src1_len + src2_len + zero_padding, iv); - - /* - print_hex_dump(KERN_ERR, "enc key: ", DUMP_PREFIX_NONE, 16, 1, - key, key_len, 1); - print_hex_dump(KERN_ERR, "enc src1: ", DUMP_PREFIX_NONE, 16, 1, - src1, src1_len, 1); - print_hex_dump(KERN_ERR, "enc src2: ", DUMP_PREFIX_NONE, 16, 1, - src2, src2_len, 1); - print_hex_dump(KERN_ERR, "enc pad: ", DUMP_PREFIX_NONE, 16, 1, - pad, zero_padding, 1); - */ - ret = crypto_skcipher_encrypt(req); - skcipher_request_zero(req); - if (ret < 0) { - pr_err("ceph_aes_crypt2 failed %d\n", ret); - goto out_sg; - } - /* - print_hex_dump(KERN_ERR, "enc out: ", DUMP_PREFIX_NONE, 16, 1, - dst, *dst_len, 1); - */ - -out_sg: - teardown_sgtable(&sg_out); -out_tfm: - crypto_free_skcipher(tfm); - return ret; -} - -static int ceph_aes_decrypt(const void *key, int key_len, - void *dst, size_t *dst_len, - const void *src, size_t src_len) -{ - struct sg_table sg_in; - struct scatterlist sg_out[2], prealloc_sg; - struct crypto_skcipher *tfm = ceph_crypto_alloc_cipher(); - SKCIPHER_REQUEST_ON_STACK(req, tfm); - char pad[16]; - char iv[AES_BLOCK_SIZE]; - int ret; - int last_byte; - - if (IS_ERR(tfm)) - return PTR_ERR(tfm); - - sg_init_table(sg_out, 2); - sg_set_buf(&sg_out[0], dst, *dst_len); - sg_set_buf(&sg_out[1], pad, sizeof(pad)); - ret = setup_sgtable(&sg_in, &prealloc_sg, src, src_len); - if (ret) - goto out_tfm; + return ret; - crypto_skcipher_setkey((void *)tfm, key, key_len); memcpy(iv, aes_iv, AES_BLOCK_SIZE); - - skcipher_request_set_tfm(req, tfm); + skcipher_request_set_tfm(req, key->tfm); skcipher_request_set_callback(req, 0, NULL, NULL); - skcipher_request_set_crypt(req, sg_in.sgl, sg_out, - src_len, iv); + skcipher_request_set_crypt(req, sgt.sgl, sgt.sgl, crypt_len, iv); /* - print_hex_dump(KERN_ERR, "dec key: ", DUMP_PREFIX_NONE, 16, 1, - key, key_len, 1); - print_hex_dump(KERN_ERR, "dec in: ", DUMP_PREFIX_NONE, 16, 1, - src, src_len, 1); + print_hex_dump(KERN_ERR, "key: ", DUMP_PREFIX_NONE, 16, 1, + key->key, key->len, 1); + print_hex_dump(KERN_ERR, " in: ", DUMP_PREFIX_NONE, 16, 1, + buf, crypt_len, 1); */ - ret = crypto_skcipher_decrypt(req); - skcipher_request_zero(req); - if (ret < 0) { - pr_err("ceph_aes_decrypt failed %d\n", ret); - goto out_sg; - } - - if (src_len <= *dst_len) - last_byte = ((char *)dst)[src_len - 1]; + if (encrypt) + ret = crypto_skcipher_encrypt(req); else - last_byte = pad[src_len - *dst_len - 1]; - if (last_byte <= 16 && src_len >= last_byte) { - *dst_len = src_len - last_byte; - } else { - pr_err("ceph_aes_decrypt got bad padding %d on src len %d\n", - last_byte, (int)src_len); - return -EPERM; /* bad padding */ - } - /* - print_hex_dump(KERN_ERR, "dec out: ", DUMP_PREFIX_NONE, 16, 1, - dst, *dst_len, 1); - */ - -out_sg: - teardown_sgtable(&sg_in); -out_tfm: - crypto_free_skcipher(tfm); - return ret; -} - -static int ceph_aes_decrypt2(const void *key, int key_len, - void *dst1, size_t *dst1_len, - void *dst2, size_t *dst2_len, - const void *src, size_t src_len) -{ - struct sg_table sg_in; - struct scatterlist sg_out[3], prealloc_sg; - struct crypto_skcipher *tfm = ceph_crypto_alloc_cipher(); - SKCIPHER_REQUEST_ON_STACK(req, tfm); - char pad[16]; - char iv[AES_BLOCK_SIZE]; - int ret; - int last_byte; - - if (IS_ERR(tfm)) - return PTR_ERR(tfm); - - sg_init_table(sg_out, 3); - sg_set_buf(&sg_out[0], dst1, *dst1_len); - sg_set_buf(&sg_out[1], dst2, *dst2_len); - sg_set_buf(&sg_out[2], pad, sizeof(pad)); - ret = setup_sgtable(&sg_in, &prealloc_sg, src, src_len); - if (ret) - goto out_tfm; - - crypto_skcipher_setkey((void *)tfm, key, key_len); - memcpy(iv, aes_iv, AES_BLOCK_SIZE); - - skcipher_request_set_tfm(req, tfm); - skcipher_request_set_callback(req, 0, NULL, NULL); - skcipher_request_set_crypt(req, sg_in.sgl, sg_out, - src_len, iv); - - /* - print_hex_dump(KERN_ERR, "dec key: ", DUMP_PREFIX_NONE, 16, 1, - key, key_len, 1); - print_hex_dump(KERN_ERR, "dec in: ", DUMP_PREFIX_NONE, 16, 1, - src, src_len, 1); - */ - ret = crypto_skcipher_decrypt(req); + ret = crypto_skcipher_decrypt(req); skcipher_request_zero(req); - if (ret < 0) { - pr_err("ceph_aes_decrypt failed %d\n", ret); - goto out_sg; - } - - if (src_len <= *dst1_len) - last_byte = ((char *)dst1)[src_len - 1]; - else if (src_len <= *dst1_len + *dst2_len) - last_byte = ((char *)dst2)[src_len - *dst1_len - 1]; - else - last_byte = pad[src_len - *dst1_len - *dst2_len - 1]; - if (last_byte <= 16 && src_len >= last_byte) { - src_len -= last_byte; - } else { - pr_err("ceph_aes_decrypt got bad padding %d on src len %d\n", - last_byte, (int)src_len); - return -EPERM; /* bad padding */ - } - - if (src_len < *dst1_len) { - *dst1_len = src_len; - *dst2_len = 0; - } else { - *dst2_len = src_len - *dst1_len; + if (ret) { + pr_err("%s %scrypt failed: %d\n", __func__, + encrypt ? "en" : "de", ret); + goto out_sgt; } /* - print_hex_dump(KERN_ERR, "dec out1: ", DUMP_PREFIX_NONE, 16, 1, - dst1, *dst1_len, 1); - print_hex_dump(KERN_ERR, "dec out2: ", DUMP_PREFIX_NONE, 16, 1, - dst2, *dst2_len, 1); + print_hex_dump(KERN_ERR, "out: ", DUMP_PREFIX_NONE, 16, 1, + buf, crypt_len, 1); */ -out_sg: - teardown_sgtable(&sg_in); -out_tfm: - crypto_free_skcipher(tfm); - return ret; -} - - -int ceph_decrypt(struct ceph_crypto_key *secret, void *dst, size_t *dst_len, - const void *src, size_t src_len) -{ - switch (secret->type) { - case CEPH_CRYPTO_NONE: - if (*dst_len < src_len) - return -ERANGE; - memcpy(dst, src, src_len); - *dst_len = src_len; - return 0; - - case CEPH_CRYPTO_AES: - return ceph_aes_decrypt(secret->key, secret->len, dst, - dst_len, src, src_len); - - default: - return -EINVAL; - } -} - -int ceph_decrypt2(struct ceph_crypto_key *secret, - void *dst1, size_t *dst1_len, - void *dst2, size_t *dst2_len, - const void *src, size_t src_len) -{ - size_t t; - - switch (secret->type) { - case CEPH_CRYPTO_NONE: - if (*dst1_len + *dst2_len < src_len) - return -ERANGE; - t = min(*dst1_len, src_len); - memcpy(dst1, src, t); - *dst1_len = t; - src += t; - src_len -= t; - if (src_len) { - t = min(*dst2_len, src_len); - memcpy(dst2, src, t); - *dst2_len = t; + if (encrypt) { + *pout_len = crypt_len; + } else { + pad_byte = *(char *)(buf + in_len - 1); + if (pad_byte > 0 && pad_byte <= AES_BLOCK_SIZE && + in_len >= pad_byte) { + *pout_len = in_len - pad_byte; + } else { + pr_err("%s got bad padding %d on in_len %d\n", + __func__, pad_byte, in_len); + ret = -EPERM; + goto out_sgt; } - return 0; - - case CEPH_CRYPTO_AES: - return ceph_aes_decrypt2(secret->key, secret->len, - dst1, dst1_len, dst2, dst2_len, - src, src_len); - - default: - return -EINVAL; } -} - -int ceph_encrypt(struct ceph_crypto_key *secret, void *dst, size_t *dst_len, - const void *src, size_t src_len) -{ - switch (secret->type) { - case CEPH_CRYPTO_NONE: - if (*dst_len < src_len) - return -ERANGE; - memcpy(dst, src, src_len); - *dst_len = src_len; - return 0; - case CEPH_CRYPTO_AES: - return ceph_aes_encrypt(secret->key, secret->len, dst, - dst_len, src, src_len); - - default: - return -EINVAL; - } +out_sgt: + teardown_sgtable(&sgt); + return ret; } -int ceph_encrypt2(struct ceph_crypto_key *secret, void *dst, size_t *dst_len, - const void *src1, size_t src1_len, - const void *src2, size_t src2_len) +int ceph_crypt(const struct ceph_crypto_key *key, bool encrypt, + void *buf, int buf_len, int in_len, int *pout_len) { - switch (secret->type) { + switch (key->type) { case CEPH_CRYPTO_NONE: - if (*dst_len < src1_len + src2_len) - return -ERANGE; - memcpy(dst, src1, src1_len); - memcpy(dst + src1_len, src2, src2_len); - *dst_len = src1_len + src2_len; + *pout_len = in_len; return 0; - case CEPH_CRYPTO_AES: - return ceph_aes_encrypt2(secret->key, secret->len, dst, dst_len, - src1, src1_len, src2, src2_len); - + return ceph_aes_crypt(key, encrypt, buf, buf_len, in_len, + pout_len); default: - return -EINVAL; + return -ENOTSUPP; } } diff --git a/net/ceph/crypto.h b/net/ceph/crypto.h index 2e9cab09f37ba65c9c5961c12e10d2aded02413f..58d83aa7740f64a68fc25dcbde06791f5ce17496 100644 --- a/net/ceph/crypto.h +++ b/net/ceph/crypto.h @@ -12,37 +12,19 @@ struct ceph_crypto_key { struct ceph_timespec created; int len; void *key; + struct crypto_skcipher *tfm; }; -static inline void ceph_crypto_key_destroy(struct ceph_crypto_key *key) -{ - if (key) { - kfree(key->key); - key->key = NULL; - } -} - int ceph_crypto_key_clone(struct ceph_crypto_key *dst, const struct ceph_crypto_key *src); int ceph_crypto_key_encode(struct ceph_crypto_key *key, void **p, void *end); int ceph_crypto_key_decode(struct ceph_crypto_key *key, void **p, void *end); int ceph_crypto_key_unarmor(struct ceph_crypto_key *key, const char *in); +void ceph_crypto_key_destroy(struct ceph_crypto_key *key); /* crypto.c */ -int ceph_decrypt(struct ceph_crypto_key *secret, - void *dst, size_t *dst_len, - const void *src, size_t src_len); -int ceph_encrypt(struct ceph_crypto_key *secret, - void *dst, size_t *dst_len, - const void *src, size_t src_len); -int ceph_decrypt2(struct ceph_crypto_key *secret, - void *dst1, size_t *dst1_len, - void *dst2, size_t *dst2_len, - const void *src, size_t src_len); -int ceph_encrypt2(struct ceph_crypto_key *secret, - void *dst, size_t *dst_len, - const void *src1, size_t src1_len, - const void *src2, size_t src2_len); +int ceph_crypt(const struct ceph_crypto_key *key, bool encrypt, + void *buf, int buf_len, int in_len, int *pout_len); int ceph_crypto_init(void); void ceph_crypto_shutdown(void); diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index a5502898ea33b0e4d74cfa8f12947c1d9466013c..770c52701efa3e08cec85c756fd7b8125f63ac91 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1393,15 +1393,9 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection return NULL; } - /* Can't hold the mutex while getting authorizer */ - mutex_unlock(&con->mutex); auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); - mutex_lock(&con->mutex); - if (IS_ERR(auth)) return auth; - if (con->state != CON_STATE_NEGOTIATING) - return ERR_PTR(-EAGAIN); con->auth_reply_buf = auth->authorizer_reply_buf; con->auth_reply_buf_len = auth->authorizer_reply_buf_len; @@ -2027,6 +2021,19 @@ static int process_connect(struct ceph_connection *con) dout("process_connect on %p tag %d\n", con, (int)con->in_tag); + if (con->auth_reply_buf) { + /* + * Any connection that defines ->get_authorizer() + * should also define ->verify_authorizer_reply(). + * See get_connect_authorizer(). + */ + ret = con->ops->verify_authorizer_reply(con); + if (ret < 0) { + con->error_msg = "bad authorize reply"; + return ret; + } + } + switch (con->in_reply.tag) { case CEPH_MSGR_TAG_FEATURES: pr_err("%s%lld %s feature set mismatch," diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index a8effc8b728092039fbe0edb033853db8daf9d8e..29a0ef351c5e068838fc1ed8f634439853ff0a3a 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -1028,21 +1028,21 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) err = -ENOMEM; monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, sizeof(struct ceph_mon_subscribe_ack), - GFP_NOFS, true); + GFP_KERNEL, true); if (!monc->m_subscribe_ack) goto out_auth; - monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128, GFP_NOFS, - true); + monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128, + GFP_KERNEL, true); if (!monc->m_subscribe) goto out_subscribe_ack; - monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS, - true); + monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, + GFP_KERNEL, true); if (!monc->m_auth_reply) goto out_subscribe; - monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true); + monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true); monc->pending_auth = 0; if (!monc->m_auth) goto out_auth_reply; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index e6ae15bc41b74dfc96e9d967139367b4ad3be952..842f049abb86d9233f23581ea1a0cc003a0d7ecb 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -460,7 +460,7 @@ static void request_init(struct ceph_osd_request *req) kref_init(&req->r_kref); init_completion(&req->r_completion); - init_completion(&req->r_safe_completion); + init_completion(&req->r_done_completion); RB_CLEAR_NODE(&req->r_node); RB_CLEAR_NODE(&req->r_mc_node); INIT_LIST_HEAD(&req->r_unsafe_item); @@ -1725,7 +1725,7 @@ static void submit_request(struct ceph_osd_request *req, bool wrlocked) __submit_request(req, wrlocked); } -static void __finish_request(struct ceph_osd_request *req) +static void finish_request(struct ceph_osd_request *req) { struct ceph_osd_client *osdc = req->r_osdc; struct ceph_osd *osd = req->r_osd; @@ -1747,12 +1747,6 @@ static void __finish_request(struct ceph_osd_request *req) ceph_msg_revoke_incoming(req->r_reply); } -static void finish_request(struct ceph_osd_request *req) -{ - __finish_request(req); - ceph_osdc_put_request(req); -} - static void __complete_request(struct ceph_osd_request *req) { if (req->r_callback) @@ -1770,9 +1764,9 @@ static void complete_request(struct ceph_osd_request *req, int err) dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err); req->r_result = err; - __finish_request(req); + finish_request(req); __complete_request(req); - complete_all(&req->r_safe_completion); + complete_all(&req->r_done_completion); ceph_osdc_put_request(req); } @@ -1798,6 +1792,8 @@ static void cancel_request(struct ceph_osd_request *req) cancel_map_check(req); finish_request(req); + complete_all(&req->r_done_completion); + ceph_osdc_put_request(req); } static void check_pool_dne(struct ceph_osd_request *req) @@ -2808,12 +2804,12 @@ static bool done_request(const struct ceph_osd_request *req, * ->r_unsafe_callback is set? yes no * * first reply is OK (needed r_cb/r_completion, r_cb/r_completion, - * any or needed/got safe) r_safe_completion r_safe_completion + * any or needed/got safe) r_done_completion r_done_completion * * first reply is unsafe r_unsafe_cb(true) (nothing) * * when we get the safe reply r_unsafe_cb(false), r_cb/r_completion, - * r_safe_completion r_safe_completion + * r_done_completion r_done_completion */ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) { @@ -2915,7 +2911,7 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) } if (done_request(req, &m)) { - __finish_request(req); + finish_request(req); if (req->r_linger) { WARN_ON(req->r_unsafe_callback); dout("req %p tid %llu cb (locked)\n", req, req->r_tid); @@ -2934,8 +2930,7 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) dout("req %p tid %llu cb\n", req, req->r_tid); __complete_request(req); } - if (m.flags & CEPH_OSD_FLAG_ONDISK) - complete_all(&req->r_safe_completion); + complete_all(&req->r_done_completion); ceph_osdc_put_request(req); } else { if (req->r_unsafe_callback) { @@ -3471,9 +3466,8 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, EXPORT_SYMBOL(ceph_osdc_start_request); /* - * Unregister a registered request. The request is not completed (i.e. - * no callbacks or wakeups) - higher layers are supposed to know what - * they are canceling. + * Unregister a registered request. The request is not completed: + * ->r_result isn't set and __complete_request() isn't called. */ void ceph_osdc_cancel_request(struct ceph_osd_request *req) { @@ -3500,9 +3494,6 @@ static int wait_request_timeout(struct ceph_osd_request *req, if (left <= 0) { left = left ?: -ETIMEDOUT; ceph_osdc_cancel_request(req); - - /* kludge - need to to wake ceph_osdc_sync() */ - complete_all(&req->r_safe_completion); } else { left = req->r_result; /* completed */ } @@ -3549,7 +3540,7 @@ void ceph_osdc_sync(struct ceph_osd_client *osdc) up_read(&osdc->lock); dout("%s waiting on req %p tid %llu last_tid %llu\n", __func__, req, req->r_tid, last_tid); - wait_for_completion(&req->r_safe_completion); + wait_for_completion(&req->r_done_completion); ceph_osdc_put_request(req); goto again; } @@ -4478,13 +4469,13 @@ static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, } -static int verify_authorizer_reply(struct ceph_connection *con, int len) +static int verify_authorizer_reply(struct ceph_connection *con) { struct ceph_osd *o = con->private; struct ceph_osd_client *osdc = o->o_osdc; struct ceph_auth_client *ac = osdc->client->monc.auth; - return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len); + return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer); } static int invalidate_authorizer(struct ceph_connection *con)