提交 8d7a8fe2 编写于 作者: L Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull ceph fixes from Sage Weil:
 "There is a pair of fixes for double-frees in the recent bundle for
  3.10, a couple of fixes for long-standing bugs (sleep while atomic and
  an endianness fix), and a locking fix that can be triggered when osds
  are going down"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  rbd: fix cleanup in rbd_add()
  rbd: don't destroy ceph_opts in rbd_add()
  ceph: ceph_pagelist_append might sleep while atomic
  ceph: add cpu_to_le32() calls when encoding a reconnect capability
  libceph: must hold mutex for reset_changed_osds()
...@@ -519,8 +519,8 @@ static const struct block_device_operations rbd_bd_ops = { ...@@ -519,8 +519,8 @@ static const struct block_device_operations rbd_bd_ops = {
}; };
/* /*
* Initialize an rbd client instance. * Initialize an rbd client instance. Success or not, this function
* We own *ceph_opts. * consumes ceph_opts.
*/ */
static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts) static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
{ {
...@@ -675,7 +675,8 @@ static int parse_rbd_opts_token(char *c, void *private) ...@@ -675,7 +675,8 @@ static int parse_rbd_opts_token(char *c, void *private)
/* /*
* Get a ceph client with specific addr and configuration, if one does * Get a ceph client with specific addr and configuration, if one does
* not exist create it. * not exist create it. Either way, ceph_opts is consumed by this
* function.
*/ */
static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts) static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
{ {
...@@ -4697,8 +4698,10 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) ...@@ -4697,8 +4698,10 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
return ret; return ret;
} }
/* Undo whatever state changes are made by v1 or v2 image probe */ /*
* Undo whatever state changes are made by v1 or v2 header info
* call.
*/
static void rbd_dev_unprobe(struct rbd_device *rbd_dev) static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
{ {
struct rbd_image_header *header; struct rbd_image_header *header;
...@@ -4902,9 +4905,10 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping) ...@@ -4902,9 +4905,10 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
int tmp; int tmp;
/* /*
* Get the id from the image id object. If it's not a * Get the id from the image id object. Unless there's an
* format 2 image, we'll get ENOENT back, and we'll assume * error, rbd_dev->spec->image_id will be filled in with
* it's a format 1 image. * a dynamically-allocated string, and rbd_dev->image_format
* will be set to either 1 or 2.
*/ */
ret = rbd_dev_image_id(rbd_dev); ret = rbd_dev_image_id(rbd_dev);
if (ret) if (ret)
...@@ -4992,7 +4996,6 @@ static ssize_t rbd_add(struct bus_type *bus, ...@@ -4992,7 +4996,6 @@ static ssize_t rbd_add(struct bus_type *bus,
rc = PTR_ERR(rbdc); rc = PTR_ERR(rbdc);
goto err_out_args; goto err_out_args;
} }
ceph_opts = NULL; /* rbd_dev client now owns this */
/* pick the pool */ /* pick the pool */
osdc = &rbdc->client->osdc; osdc = &rbdc->client->osdc;
...@@ -5027,18 +5030,18 @@ static ssize_t rbd_add(struct bus_type *bus, ...@@ -5027,18 +5030,18 @@ static ssize_t rbd_add(struct bus_type *bus,
rbd_dev->mapping.read_only = read_only; rbd_dev->mapping.read_only = read_only;
rc = rbd_dev_device_setup(rbd_dev); rc = rbd_dev_device_setup(rbd_dev);
if (!rc) if (rc) {
return count; rbd_dev_image_release(rbd_dev);
goto err_out_module;
}
return count;
rbd_dev_image_release(rbd_dev);
err_out_rbd_dev: err_out_rbd_dev:
rbd_dev_destroy(rbd_dev); rbd_dev_destroy(rbd_dev);
err_out_client: err_out_client:
rbd_put_client(rbdc); rbd_put_client(rbdc);
err_out_args: err_out_args:
if (ceph_opts)
ceph_destroy_options(ceph_opts);
kfree(rbd_opts);
rbd_spec_put(spec); rbd_spec_put(spec);
err_out_module: err_out_module:
module_put(THIS_MODULE); module_put(THIS_MODULE);
......
...@@ -191,27 +191,23 @@ void ceph_count_locks(struct inode *inode, int *fcntl_count, int *flock_count) ...@@ -191,27 +191,23 @@ void ceph_count_locks(struct inode *inode, int *fcntl_count, int *flock_count)
} }
/** /**
* Encode the flock and fcntl locks for the given inode into the pagelist. * Encode the flock and fcntl locks for the given inode into the ceph_filelock
* Format is: #fcntl locks, sequential fcntl locks, #flock locks, * array. Must be called with lock_flocks() already held.
* sequential flock locks. * If we encounter more of a specific lock type than expected, return -ENOSPC.
* Must be called with lock_flocks() already held.
* If we encounter more of a specific lock type than expected,
* we return the value 1.
*/ */
int ceph_encode_locks(struct inode *inode, struct ceph_pagelist *pagelist, int ceph_encode_locks_to_buffer(struct inode *inode,
int num_fcntl_locks, int num_flock_locks) struct ceph_filelock *flocks,
int num_fcntl_locks, int num_flock_locks)
{ {
struct file_lock *lock; struct file_lock *lock;
struct ceph_filelock cephlock;
int err = 0; int err = 0;
int seen_fcntl = 0; int seen_fcntl = 0;
int seen_flock = 0; int seen_flock = 0;
int l = 0;
dout("encoding %d flock and %d fcntl locks", num_flock_locks, dout("encoding %d flock and %d fcntl locks", num_flock_locks,
num_fcntl_locks); num_fcntl_locks);
err = ceph_pagelist_append(pagelist, &num_fcntl_locks, sizeof(u32));
if (err)
goto fail;
for (lock = inode->i_flock; lock != NULL; lock = lock->fl_next) { for (lock = inode->i_flock; lock != NULL; lock = lock->fl_next) {
if (lock->fl_flags & FL_POSIX) { if (lock->fl_flags & FL_POSIX) {
++seen_fcntl; ++seen_fcntl;
...@@ -219,19 +215,12 @@ int ceph_encode_locks(struct inode *inode, struct ceph_pagelist *pagelist, ...@@ -219,19 +215,12 @@ int ceph_encode_locks(struct inode *inode, struct ceph_pagelist *pagelist,
err = -ENOSPC; err = -ENOSPC;
goto fail; goto fail;
} }
err = lock_to_ceph_filelock(lock, &cephlock); err = lock_to_ceph_filelock(lock, &flocks[l]);
if (err) if (err)
goto fail; goto fail;
err = ceph_pagelist_append(pagelist, &cephlock, ++l;
sizeof(struct ceph_filelock));
} }
if (err)
goto fail;
} }
err = ceph_pagelist_append(pagelist, &num_flock_locks, sizeof(u32));
if (err)
goto fail;
for (lock = inode->i_flock; lock != NULL; lock = lock->fl_next) { for (lock = inode->i_flock; lock != NULL; lock = lock->fl_next) {
if (lock->fl_flags & FL_FLOCK) { if (lock->fl_flags & FL_FLOCK) {
++seen_flock; ++seen_flock;
...@@ -239,19 +228,51 @@ int ceph_encode_locks(struct inode *inode, struct ceph_pagelist *pagelist, ...@@ -239,19 +228,51 @@ int ceph_encode_locks(struct inode *inode, struct ceph_pagelist *pagelist,
err = -ENOSPC; err = -ENOSPC;
goto fail; goto fail;
} }
err = lock_to_ceph_filelock(lock, &cephlock); err = lock_to_ceph_filelock(lock, &flocks[l]);
if (err) if (err)
goto fail; goto fail;
err = ceph_pagelist_append(pagelist, &cephlock, ++l;
sizeof(struct ceph_filelock));
} }
if (err)
goto fail;
} }
fail: fail:
return err; return err;
} }
/**
* Copy the encoded flock and fcntl locks into the pagelist.
* Format is: #fcntl locks, sequential fcntl locks, #flock locks,
* sequential flock locks.
* Returns zero on success.
*/
int ceph_locks_to_pagelist(struct ceph_filelock *flocks,
struct ceph_pagelist *pagelist,
int num_fcntl_locks, int num_flock_locks)
{
int err = 0;
__le32 nlocks;
nlocks = cpu_to_le32(num_fcntl_locks);
err = ceph_pagelist_append(pagelist, &nlocks, sizeof(nlocks));
if (err)
goto out_fail;
err = ceph_pagelist_append(pagelist, flocks,
num_fcntl_locks * sizeof(*flocks));
if (err)
goto out_fail;
nlocks = cpu_to_le32(num_flock_locks);
err = ceph_pagelist_append(pagelist, &nlocks, sizeof(nlocks));
if (err)
goto out_fail;
err = ceph_pagelist_append(pagelist,
&flocks[num_fcntl_locks],
num_flock_locks * sizeof(*flocks));
out_fail:
return err;
}
/* /*
* Given a pointer to a lock, convert it to a ceph filelock * Given a pointer to a lock, convert it to a ceph filelock
*/ */
......
...@@ -2478,39 +2478,44 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, ...@@ -2478,39 +2478,44 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
if (recon_state->flock) { if (recon_state->flock) {
int num_fcntl_locks, num_flock_locks; int num_fcntl_locks, num_flock_locks;
struct ceph_pagelist_cursor trunc_point; struct ceph_filelock *flocks;
ceph_pagelist_set_cursor(pagelist, &trunc_point); encode_again:
do { lock_flocks();
lock_flocks(); ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
ceph_count_locks(inode, &num_fcntl_locks, unlock_flocks();
&num_flock_locks); flocks = kmalloc((num_fcntl_locks+num_flock_locks) *
rec.v2.flock_len = (2*sizeof(u32) + sizeof(struct ceph_filelock), GFP_NOFS);
(num_fcntl_locks+num_flock_locks) * if (!flocks) {
sizeof(struct ceph_filelock)); err = -ENOMEM;
unlock_flocks(); goto out_free;
}
/* pre-alloc pagelist */ lock_flocks();
ceph_pagelist_truncate(pagelist, &trunc_point); err = ceph_encode_locks_to_buffer(inode, flocks,
err = ceph_pagelist_append(pagelist, &rec, reclen); num_fcntl_locks,
if (!err) num_flock_locks);
err = ceph_pagelist_reserve(pagelist, unlock_flocks();
rec.v2.flock_len); if (err) {
kfree(flocks);
/* encode locks */ if (err == -ENOSPC)
if (!err) { goto encode_again;
lock_flocks(); goto out_free;
err = ceph_encode_locks(inode, }
pagelist, /*
num_fcntl_locks, * number of encoded locks is stable, so copy to pagelist
num_flock_locks); */
unlock_flocks(); rec.v2.flock_len = cpu_to_le32(2*sizeof(u32) +
} (num_fcntl_locks+num_flock_locks) *
} while (err == -ENOSPC); sizeof(struct ceph_filelock));
err = ceph_pagelist_append(pagelist, &rec, reclen);
if (!err)
err = ceph_locks_to_pagelist(flocks, pagelist,
num_fcntl_locks,
num_flock_locks);
kfree(flocks);
} else { } else {
err = ceph_pagelist_append(pagelist, &rec, reclen); err = ceph_pagelist_append(pagelist, &rec, reclen);
} }
out_free: out_free:
kfree(path); kfree(path);
out_dput: out_dput:
......
...@@ -822,8 +822,13 @@ extern const struct export_operations ceph_export_ops; ...@@ -822,8 +822,13 @@ extern const struct export_operations ceph_export_ops;
extern int ceph_lock(struct file *file, int cmd, struct file_lock *fl); extern int ceph_lock(struct file *file, int cmd, struct file_lock *fl);
extern int ceph_flock(struct file *file, int cmd, struct file_lock *fl); extern int ceph_flock(struct file *file, int cmd, struct file_lock *fl);
extern void ceph_count_locks(struct inode *inode, int *p_num, int *f_num); extern void ceph_count_locks(struct inode *inode, int *p_num, int *f_num);
extern int ceph_encode_locks(struct inode *i, struct ceph_pagelist *p, extern int ceph_encode_locks_to_buffer(struct inode *inode,
int p_locks, int f_locks); struct ceph_filelock *flocks,
int num_fcntl_locks,
int num_flock_locks);
extern int ceph_locks_to_pagelist(struct ceph_filelock *flocks,
struct ceph_pagelist *pagelist,
int num_fcntl_locks, int num_flock_locks);
extern int lock_to_ceph_filelock(struct file_lock *fl, struct ceph_filelock *c); extern int lock_to_ceph_filelock(struct file_lock *fl, struct ceph_filelock *c);
/* debugfs.c */ /* debugfs.c */
......
...@@ -1675,13 +1675,13 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) ...@@ -1675,13 +1675,13 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
__register_request(osdc, req); __register_request(osdc, req);
__unregister_linger_request(osdc, req); __unregister_linger_request(osdc, req);
} }
reset_changed_osds(osdc);
mutex_unlock(&osdc->request_mutex); mutex_unlock(&osdc->request_mutex);
if (needmap) { if (needmap) {
dout("%d requests for down osds, need new map\n", needmap); dout("%d requests for down osds, need new map\n", needmap);
ceph_monc_request_next_osdmap(&osdc->client->monc); ceph_monc_request_next_osdmap(&osdc->client->monc);
} }
reset_changed_osds(osdc);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册