提交 a1703154 编写于 作者: L Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  rbd: fix cleanup when trying to mount inexistent image
  net/ceph: make ceph_msgr_wq non-reentrant
  ceph: fsc->*_wq's aren't used in memory reclaim path
  ceph: Always free allocated memory in osdmap_decode()
  ceph: Makefile: Remove unnessary code
  ceph: associate requests with opening sessions
  ceph: drop redundant r_mds field
  ceph: implement DIRLAYOUTHASH feature to get dir layout from MDS
  ceph: add dir_layout to inode
...@@ -1790,18 +1790,29 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count) ...@@ -1790,18 +1790,29 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count)
rc = rbd_bus_add_dev(rbd_dev); rc = rbd_bus_add_dev(rbd_dev);
if (rc) if (rc)
goto err_out_disk; goto err_out_blkdev;
/* set up and announce blkdev mapping */ /* set up and announce blkdev mapping */
rc = rbd_init_disk(rbd_dev); rc = rbd_init_disk(rbd_dev);
if (rc) if (rc)
goto err_out_blkdev; goto err_out_bus;
return count; return count;
err_out_bus:
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
list_del_init(&rbd_dev->node);
mutex_unlock(&ctl_mutex);
/* this will also clean up rest of rbd_dev stuff */
rbd_bus_del_dev(rbd_dev);
kfree(options);
kfree(mon_dev_name);
return rc;
err_out_blkdev: err_out_blkdev:
unregister_blkdev(rbd_dev->major, rbd_dev->name); unregister_blkdev(rbd_dev->major, rbd_dev->name);
err_out_disk:
rbd_free_disk(rbd_dev);
err_out_client: err_out_client:
rbd_put_client(rbd_dev); rbd_put_client(rbd_dev);
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
......
...@@ -2,31 +2,10 @@ ...@@ -2,31 +2,10 @@
# Makefile for CEPH filesystem. # Makefile for CEPH filesystem.
# #
ifneq ($(KERNELRELEASE),)
obj-$(CONFIG_CEPH_FS) += ceph.o obj-$(CONFIG_CEPH_FS) += ceph.o
ceph-objs := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \ ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
export.o caps.o snap.o xattr.o \ export.o caps.o snap.o xattr.o \
mds_client.o mdsmap.o strings.o ceph_frag.o \ mds_client.o mdsmap.o strings.o ceph_frag.o \
debugfs.o debugfs.o
else
#Otherwise we were called directly from the command
# line; invoke the kernel build system.
KERNELDIR ?= /lib/modules/$(shell uname -r)/build
PWD := $(shell pwd)
default: all
all:
$(MAKE) -C $(KERNELDIR) M=$(PWD) CONFIG_CEPH_FS=m modules
modules_install:
$(MAKE) -C $(KERNELDIR) M=$(PWD) CONFIG_CEPH_FS=m modules_install
clean:
$(MAKE) -C $(KERNELDIR) M=$(PWD) clean
endif
...@@ -60,10 +60,13 @@ static int mdsc_show(struct seq_file *s, void *p) ...@@ -60,10 +60,13 @@ static int mdsc_show(struct seq_file *s, void *p)
for (rp = rb_first(&mdsc->request_tree); rp; rp = rb_next(rp)) { for (rp = rb_first(&mdsc->request_tree); rp; rp = rb_next(rp)) {
req = rb_entry(rp, struct ceph_mds_request, r_node); req = rb_entry(rp, struct ceph_mds_request, r_node);
if (req->r_request) if (req->r_request && req->r_session)
seq_printf(s, "%lld\tmds%d\t", req->r_tid, req->r_mds); seq_printf(s, "%lld\tmds%d\t", req->r_tid,
else req->r_session->s_mds);
else if (!req->r_request)
seq_printf(s, "%lld\t(no request)\t", req->r_tid); seq_printf(s, "%lld\t(no request)\t", req->r_tid);
else
seq_printf(s, "%lld\t(no session)\t", req->r_tid);
seq_printf(s, "%s", ceph_mds_op_name(req->r_op)); seq_printf(s, "%s", ceph_mds_op_name(req->r_op));
......
...@@ -1224,6 +1224,26 @@ void ceph_dentry_lru_del(struct dentry *dn) ...@@ -1224,6 +1224,26 @@ void ceph_dentry_lru_del(struct dentry *dn)
} }
} }
/*
* Return name hash for a given dentry. This is dependent on
* the parent directory's hash function.
*/
unsigned ceph_dentry_hash(struct dentry *dn)
{
struct inode *dir = dn->d_parent->d_inode;
struct ceph_inode_info *dci = ceph_inode(dir);
switch (dci->i_dir_layout.dl_dir_hash) {
case 0: /* for backward compat */
case CEPH_STR_HASH_LINUX:
return dn->d_name.hash;
default:
return ceph_str_hash(dci->i_dir_layout.dl_dir_hash,
dn->d_name.name, dn->d_name.len);
}
}
const struct file_operations ceph_dir_fops = { const struct file_operations ceph_dir_fops = {
.read = ceph_read_dir, .read = ceph_read_dir,
.readdir = ceph_readdir, .readdir = ceph_readdir,
......
...@@ -59,7 +59,7 @@ static int ceph_encode_fh(struct dentry *dentry, u32 *rawfh, int *max_len, ...@@ -59,7 +59,7 @@ static int ceph_encode_fh(struct dentry *dentry, u32 *rawfh, int *max_len,
dout("encode_fh %p connectable\n", dentry); dout("encode_fh %p connectable\n", dentry);
cfh->ino = ceph_ino(dentry->d_inode); cfh->ino = ceph_ino(dentry->d_inode);
cfh->parent_ino = ceph_ino(parent->d_inode); cfh->parent_ino = ceph_ino(parent->d_inode);
cfh->parent_name_hash = parent->d_name.hash; cfh->parent_name_hash = ceph_dentry_hash(parent);
*max_len = connected_handle_length; *max_len = connected_handle_length;
type = 2; type = 2;
} else if (*max_len >= handle_length) { } else if (*max_len >= handle_length) {
......
...@@ -297,6 +297,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ...@@ -297,6 +297,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ci->i_release_count = 0; ci->i_release_count = 0;
ci->i_symlink = NULL; ci->i_symlink = NULL;
memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
ci->i_fragtree = RB_ROOT; ci->i_fragtree = RB_ROOT;
mutex_init(&ci->i_fragtree_mutex); mutex_init(&ci->i_fragtree_mutex);
...@@ -689,6 +691,8 @@ static int fill_inode(struct inode *inode, ...@@ -689,6 +691,8 @@ static int fill_inode(struct inode *inode,
inode->i_op = &ceph_dir_iops; inode->i_op = &ceph_dir_iops;
inode->i_fop = &ceph_dir_fops; inode->i_fop = &ceph_dir_fops;
ci->i_dir_layout = iinfo->dir_layout;
ci->i_files = le64_to_cpu(info->files); ci->i_files = le64_to_cpu(info->files);
ci->i_subdirs = le64_to_cpu(info->subdirs); ci->i_subdirs = le64_to_cpu(info->subdirs);
ci->i_rbytes = le64_to_cpu(info->rbytes); ci->i_rbytes = le64_to_cpu(info->rbytes);
......
...@@ -60,7 +60,8 @@ static const struct ceph_connection_operations mds_con_ops; ...@@ -60,7 +60,8 @@ static const struct ceph_connection_operations mds_con_ops;
* parse individual inode info * parse individual inode info
*/ */
static int parse_reply_info_in(void **p, void *end, static int parse_reply_info_in(void **p, void *end,
struct ceph_mds_reply_info_in *info) struct ceph_mds_reply_info_in *info,
int features)
{ {
int err = -EIO; int err = -EIO;
...@@ -74,6 +75,12 @@ static int parse_reply_info_in(void **p, void *end, ...@@ -74,6 +75,12 @@ static int parse_reply_info_in(void **p, void *end,
info->symlink = *p; info->symlink = *p;
*p += info->symlink_len; *p += info->symlink_len;
if (features & CEPH_FEATURE_DIRLAYOUTHASH)
ceph_decode_copy_safe(p, end, &info->dir_layout,
sizeof(info->dir_layout), bad);
else
memset(&info->dir_layout, 0, sizeof(info->dir_layout));
ceph_decode_32_safe(p, end, info->xattr_len, bad); ceph_decode_32_safe(p, end, info->xattr_len, bad);
ceph_decode_need(p, end, info->xattr_len, bad); ceph_decode_need(p, end, info->xattr_len, bad);
info->xattr_data = *p; info->xattr_data = *p;
...@@ -88,12 +95,13 @@ static int parse_reply_info_in(void **p, void *end, ...@@ -88,12 +95,13 @@ static int parse_reply_info_in(void **p, void *end,
* target inode. * target inode.
*/ */
static int parse_reply_info_trace(void **p, void *end, static int parse_reply_info_trace(void **p, void *end,
struct ceph_mds_reply_info_parsed *info) struct ceph_mds_reply_info_parsed *info,
int features)
{ {
int err; int err;
if (info->head->is_dentry) { if (info->head->is_dentry) {
err = parse_reply_info_in(p, end, &info->diri); err = parse_reply_info_in(p, end, &info->diri, features);
if (err < 0) if (err < 0)
goto out_bad; goto out_bad;
...@@ -114,7 +122,7 @@ static int parse_reply_info_trace(void **p, void *end, ...@@ -114,7 +122,7 @@ static int parse_reply_info_trace(void **p, void *end,
} }
if (info->head->is_target) { if (info->head->is_target) {
err = parse_reply_info_in(p, end, &info->targeti); err = parse_reply_info_in(p, end, &info->targeti, features);
if (err < 0) if (err < 0)
goto out_bad; goto out_bad;
} }
...@@ -134,7 +142,8 @@ static int parse_reply_info_trace(void **p, void *end, ...@@ -134,7 +142,8 @@ static int parse_reply_info_trace(void **p, void *end,
* parse readdir results * parse readdir results
*/ */
static int parse_reply_info_dir(void **p, void *end, static int parse_reply_info_dir(void **p, void *end,
struct ceph_mds_reply_info_parsed *info) struct ceph_mds_reply_info_parsed *info,
int features)
{ {
u32 num, i = 0; u32 num, i = 0;
int err; int err;
...@@ -182,7 +191,7 @@ static int parse_reply_info_dir(void **p, void *end, ...@@ -182,7 +191,7 @@ static int parse_reply_info_dir(void **p, void *end,
*p += sizeof(struct ceph_mds_reply_lease); *p += sizeof(struct ceph_mds_reply_lease);
/* inode */ /* inode */
err = parse_reply_info_in(p, end, &info->dir_in[i]); err = parse_reply_info_in(p, end, &info->dir_in[i], features);
if (err < 0) if (err < 0)
goto out_bad; goto out_bad;
i++; i++;
...@@ -205,7 +214,8 @@ static int parse_reply_info_dir(void **p, void *end, ...@@ -205,7 +214,8 @@ static int parse_reply_info_dir(void **p, void *end,
* parse fcntl F_GETLK results * parse fcntl F_GETLK results
*/ */
static int parse_reply_info_filelock(void **p, void *end, static int parse_reply_info_filelock(void **p, void *end,
struct ceph_mds_reply_info_parsed *info) struct ceph_mds_reply_info_parsed *info,
int features)
{ {
if (*p + sizeof(*info->filelock_reply) > end) if (*p + sizeof(*info->filelock_reply) > end)
goto bad; goto bad;
...@@ -225,19 +235,21 @@ static int parse_reply_info_filelock(void **p, void *end, ...@@ -225,19 +235,21 @@ static int parse_reply_info_filelock(void **p, void *end,
* parse extra results * parse extra results
*/ */
static int parse_reply_info_extra(void **p, void *end, static int parse_reply_info_extra(void **p, void *end,
struct ceph_mds_reply_info_parsed *info) struct ceph_mds_reply_info_parsed *info,
int features)
{ {
if (info->head->op == CEPH_MDS_OP_GETFILELOCK) if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
return parse_reply_info_filelock(p, end, info); return parse_reply_info_filelock(p, end, info, features);
else else
return parse_reply_info_dir(p, end, info); return parse_reply_info_dir(p, end, info, features);
} }
/* /*
* parse entire mds reply * parse entire mds reply
*/ */
static int parse_reply_info(struct ceph_msg *msg, static int parse_reply_info(struct ceph_msg *msg,
struct ceph_mds_reply_info_parsed *info) struct ceph_mds_reply_info_parsed *info,
int features)
{ {
void *p, *end; void *p, *end;
u32 len; u32 len;
...@@ -250,7 +262,7 @@ static int parse_reply_info(struct ceph_msg *msg, ...@@ -250,7 +262,7 @@ static int parse_reply_info(struct ceph_msg *msg,
/* trace */ /* trace */
ceph_decode_32_safe(&p, end, len, bad); ceph_decode_32_safe(&p, end, len, bad);
if (len > 0) { if (len > 0) {
err = parse_reply_info_trace(&p, p+len, info); err = parse_reply_info_trace(&p, p+len, info, features);
if (err < 0) if (err < 0)
goto out_bad; goto out_bad;
} }
...@@ -258,7 +270,7 @@ static int parse_reply_info(struct ceph_msg *msg, ...@@ -258,7 +270,7 @@ static int parse_reply_info(struct ceph_msg *msg,
/* extra */ /* extra */
ceph_decode_32_safe(&p, end, len, bad); ceph_decode_32_safe(&p, end, len, bad);
if (len > 0) { if (len > 0) {
err = parse_reply_info_extra(&p, p+len, info); err = parse_reply_info_extra(&p, p+len, info, features);
if (err < 0) if (err < 0)
goto out_bad; goto out_bad;
} }
...@@ -654,7 +666,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, ...@@ -654,7 +666,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
} else { } else {
/* dir + name */ /* dir + name */
inode = dir; inode = dir;
hash = req->r_dentry->d_name.hash; hash = ceph_dentry_hash(req->r_dentry);
is_hash = true; is_hash = true;
} }
} }
...@@ -1693,7 +1705,6 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc, ...@@ -1693,7 +1705,6 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
struct ceph_msg *msg; struct ceph_msg *msg;
int flags = 0; int flags = 0;
req->r_mds = mds;
req->r_attempts++; req->r_attempts++;
if (req->r_inode) { if (req->r_inode) {
struct ceph_cap *cap = struct ceph_cap *cap =
...@@ -1780,6 +1791,8 @@ static int __do_request(struct ceph_mds_client *mdsc, ...@@ -1780,6 +1791,8 @@ static int __do_request(struct ceph_mds_client *mdsc,
goto finish; goto finish;
} }
put_request_session(req);
mds = __choose_mds(mdsc, req); mds = __choose_mds(mdsc, req);
if (mds < 0 || if (mds < 0 ||
ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
...@@ -1797,6 +1810,8 @@ static int __do_request(struct ceph_mds_client *mdsc, ...@@ -1797,6 +1810,8 @@ static int __do_request(struct ceph_mds_client *mdsc,
goto finish; goto finish;
} }
} }
req->r_session = get_session(session);
dout("do_request mds%d session %p state %s\n", mds, session, dout("do_request mds%d session %p state %s\n", mds, session,
session_state_name(session->s_state)); session_state_name(session->s_state));
if (session->s_state != CEPH_MDS_SESSION_OPEN && if (session->s_state != CEPH_MDS_SESSION_OPEN &&
...@@ -1809,7 +1824,6 @@ static int __do_request(struct ceph_mds_client *mdsc, ...@@ -1809,7 +1824,6 @@ static int __do_request(struct ceph_mds_client *mdsc,
} }
/* send request */ /* send request */
req->r_session = get_session(session);
req->r_resend_mds = -1; /* forget any previous mds hint */ req->r_resend_mds = -1; /* forget any previous mds hint */
if (req->r_request_started == 0) /* note request start time */ if (req->r_request_started == 0) /* note request start time */
...@@ -1863,7 +1877,6 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds) ...@@ -1863,7 +1877,6 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds)
if (req->r_session && if (req->r_session &&
req->r_session->s_mds == mds) { req->r_session->s_mds == mds) {
dout(" kicking tid %llu\n", req->r_tid); dout(" kicking tid %llu\n", req->r_tid);
put_request_session(req);
__do_request(mdsc, req); __do_request(mdsc, req);
} }
} }
...@@ -2056,8 +2069,11 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) ...@@ -2056,8 +2069,11 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
goto out; goto out;
} else { } else {
struct ceph_inode_info *ci = ceph_inode(req->r_inode); struct ceph_inode_info *ci = ceph_inode(req->r_inode);
struct ceph_cap *cap = struct ceph_cap *cap = NULL;
ceph_get_cap_for_mds(ci, req->r_mds);;
if (req->r_session)
cap = ceph_get_cap_for_mds(ci,
req->r_session->s_mds);
dout("already using auth"); dout("already using auth");
if ((!cap || cap != ci->i_auth_cap) || if ((!cap || cap != ci->i_auth_cap) ||
...@@ -2101,7 +2117,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) ...@@ -2101,7 +2117,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
dout("handle_reply tid %lld result %d\n", tid, result); dout("handle_reply tid %lld result %d\n", tid, result);
rinfo = &req->r_reply_info; rinfo = &req->r_reply_info;
err = parse_reply_info(msg, rinfo); err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
mutex_unlock(&mdsc->mutex); mutex_unlock(&mdsc->mutex);
mutex_lock(&session->s_mutex); mutex_lock(&session->s_mutex);
......
...@@ -35,6 +35,7 @@ struct ceph_cap; ...@@ -35,6 +35,7 @@ struct ceph_cap;
*/ */
struct ceph_mds_reply_info_in { struct ceph_mds_reply_info_in {
struct ceph_mds_reply_inode *in; struct ceph_mds_reply_inode *in;
struct ceph_dir_layout dir_layout;
u32 symlink_len; u32 symlink_len;
char *symlink; char *symlink;
u32 xattr_len; u32 xattr_len;
...@@ -165,7 +166,6 @@ struct ceph_mds_request { ...@@ -165,7 +166,6 @@ struct ceph_mds_request {
struct ceph_mds_client *r_mdsc; struct ceph_mds_client *r_mdsc;
int r_op; /* mds op code */ int r_op; /* mds op code */
int r_mds;
/* operation on what? */ /* operation on what? */
struct inode *r_inode; /* arg1 */ struct inode *r_inode; /* arg1 */
......
...@@ -428,7 +428,8 @@ struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, ...@@ -428,7 +428,8 @@ struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
goto fail; goto fail;
} }
fsc->client->extra_mon_dispatch = extra_mon_dispatch; fsc->client->extra_mon_dispatch = extra_mon_dispatch;
fsc->client->supported_features |= CEPH_FEATURE_FLOCK; fsc->client->supported_features |= CEPH_FEATURE_FLOCK |
CEPH_FEATURE_DIRLAYOUTHASH;
fsc->client->monc.want_mdsmap = 1; fsc->client->monc.want_mdsmap = 1;
fsc->mount_options = fsopt; fsc->mount_options = fsopt;
...@@ -443,13 +444,17 @@ struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, ...@@ -443,13 +444,17 @@ struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
goto fail_client; goto fail_client;
err = -ENOMEM; err = -ENOMEM;
fsc->wb_wq = create_workqueue("ceph-writeback"); /*
* The number of concurrent works can be high but they don't need
* to be processed in parallel, limit concurrency.
*/
fsc->wb_wq = alloc_workqueue("ceph-writeback", 0, 1);
if (fsc->wb_wq == NULL) if (fsc->wb_wq == NULL)
goto fail_bdi; goto fail_bdi;
fsc->pg_inv_wq = create_singlethread_workqueue("ceph-pg-invalid"); fsc->pg_inv_wq = alloc_workqueue("ceph-pg-invalid", 0, 1);
if (fsc->pg_inv_wq == NULL) if (fsc->pg_inv_wq == NULL)
goto fail_wb_wq; goto fail_wb_wq;
fsc->trunc_wq = create_singlethread_workqueue("ceph-trunc"); fsc->trunc_wq = alloc_workqueue("ceph-trunc", 0, 1);
if (fsc->trunc_wq == NULL) if (fsc->trunc_wq == NULL)
goto fail_pg_inv_wq; goto fail_pg_inv_wq;
......
...@@ -239,6 +239,7 @@ struct ceph_inode_info { ...@@ -239,6 +239,7 @@ struct ceph_inode_info {
unsigned i_ceph_flags; unsigned i_ceph_flags;
unsigned long i_release_count; unsigned long i_release_count;
struct ceph_dir_layout i_dir_layout;
struct ceph_file_layout i_layout; struct ceph_file_layout i_layout;
char *i_symlink; char *i_symlink;
...@@ -768,6 +769,7 @@ extern void ceph_dentry_lru_add(struct dentry *dn); ...@@ -768,6 +769,7 @@ extern void ceph_dentry_lru_add(struct dentry *dn);
extern void ceph_dentry_lru_touch(struct dentry *dn); extern void ceph_dentry_lru_touch(struct dentry *dn);
extern void ceph_dentry_lru_del(struct dentry *dn); extern void ceph_dentry_lru_del(struct dentry *dn);
extern void ceph_invalidate_dentry_lease(struct dentry *dentry); extern void ceph_invalidate_dentry_lease(struct dentry *dentry);
extern unsigned ceph_dentry_hash(struct dentry *dn);
/* /*
* our d_ops vary depending on whether the inode is live, * our d_ops vary depending on whether the inode is live,
......
...@@ -43,6 +43,10 @@ ...@@ -43,6 +43,10 @@
#define CEPH_FEATURE_NOSRCADDR (1<<1) #define CEPH_FEATURE_NOSRCADDR (1<<1)
#define CEPH_FEATURE_MONCLOCKCHECK (1<<2) #define CEPH_FEATURE_MONCLOCKCHECK (1<<2)
#define CEPH_FEATURE_FLOCK (1<<3) #define CEPH_FEATURE_FLOCK (1<<3)
#define CEPH_FEATURE_SUBSCRIBE2 (1<<4)
#define CEPH_FEATURE_MONNAMES (1<<5)
#define CEPH_FEATURE_RECONNECT_SEQ (1<<6)
#define CEPH_FEATURE_DIRLAYOUTHASH (1<<7)
/* /*
...@@ -55,10 +59,10 @@ struct ceph_file_layout { ...@@ -55,10 +59,10 @@ struct ceph_file_layout {
__le32 fl_stripe_count; /* over this many objects */ __le32 fl_stripe_count; /* over this many objects */
__le32 fl_object_size; /* until objects are this big, then move to __le32 fl_object_size; /* until objects are this big, then move to
new objects */ new objects */
__le32 fl_cas_hash; /* 0 = none; 1 = sha256 */ __le32 fl_cas_hash; /* UNUSED. 0 = none; 1 = sha256 */
/* pg -> disk layout */ /* pg -> disk layout */
__le32 fl_object_stripe_unit; /* for per-object parity, if any */ __le32 fl_object_stripe_unit; /* UNUSED. for per-object parity, if any */
/* object -> pg layout */ /* object -> pg layout */
__le32 fl_pg_preferred; /* preferred primary for pg (-1 for none) */ __le32 fl_pg_preferred; /* preferred primary for pg (-1 for none) */
...@@ -69,6 +73,12 @@ struct ceph_file_layout { ...@@ -69,6 +73,12 @@ struct ceph_file_layout {
int ceph_file_layout_is_valid(const struct ceph_file_layout *layout); int ceph_file_layout_is_valid(const struct ceph_file_layout *layout);
struct ceph_dir_layout {
__u8 dl_dir_hash; /* see ceph_hash.h for ids */
__u8 dl_unused1;
__u16 dl_unused2;
__u32 dl_unused3;
} __attribute__ ((packed));
/* crypto algorithms */ /* crypto algorithms */
#define CEPH_CRYPTO_NONE 0x0 #define CEPH_CRYPTO_NONE 0x0
...@@ -457,7 +467,7 @@ struct ceph_mds_reply_inode { ...@@ -457,7 +467,7 @@ struct ceph_mds_reply_inode {
struct ceph_timespec rctime; struct ceph_timespec rctime;
struct ceph_frag_tree_head fragtree; /* (must be at end of struct) */ struct ceph_frag_tree_head fragtree; /* (must be at end of struct) */
} __attribute__ ((packed)); } __attribute__ ((packed));
/* followed by frag array, then symlink string, then xattr blob */ /* followed by frag array, symlink string, dir layout, xattr blob */
/* reply_lease follows dname, and reply_inode */ /* reply_lease follows dname, and reply_inode */
struct ceph_mds_reply_lease { struct ceph_mds_reply_lease {
......
...@@ -110,17 +110,12 @@ struct ceph_msg_pos { ...@@ -110,17 +110,12 @@ struct ceph_msg_pos {
/* /*
* ceph_connection state bit flags * ceph_connection state bit flags
*
* QUEUED and BUSY are used together to ensure that only a single
* thread is currently opening, reading or writing data to the socket.
*/ */
#define LOSSYTX 0 /* we can close channel or drop messages on errors */ #define LOSSYTX 0 /* we can close channel or drop messages on errors */
#define CONNECTING 1 #define CONNECTING 1
#define NEGOTIATING 2 #define NEGOTIATING 2
#define KEEPALIVE_PENDING 3 #define KEEPALIVE_PENDING 3
#define WRITE_PENDING 4 /* we have data ready to send */ #define WRITE_PENDING 4 /* we have data ready to send */
#define QUEUED 5 /* there is work queued on this connection */
#define BUSY 6 /* work is being done */
#define STANDBY 8 /* no outgoing messages, socket closed. we keep #define STANDBY 8 /* no outgoing messages, socket closed. we keep
* the ceph_connection around to maintain shared * the ceph_connection around to maintain shared
* state with the peer. */ * state with the peer. */
......
#include <linux/ceph/types.h> #include <linux/ceph/types.h>
#include <linux/module.h>
/* /*
* Robert Jenkin's hash function. * Robert Jenkin's hash function.
...@@ -104,6 +105,7 @@ unsigned ceph_str_hash(int type, const char *s, unsigned len) ...@@ -104,6 +105,7 @@ unsigned ceph_str_hash(int type, const char *s, unsigned len)
return -1; return -1;
} }
} }
EXPORT_SYMBOL(ceph_str_hash);
const char *ceph_str_hash_name(int type) const char *ceph_str_hash_name(int type)
{ {
...@@ -116,3 +118,4 @@ const char *ceph_str_hash_name(int type) ...@@ -116,3 +118,4 @@ const char *ceph_str_hash_name(int type)
return "unknown"; return "unknown";
} }
} }
EXPORT_SYMBOL(ceph_str_hash_name);
...@@ -96,7 +96,7 @@ struct workqueue_struct *ceph_msgr_wq; ...@@ -96,7 +96,7 @@ struct workqueue_struct *ceph_msgr_wq;
int ceph_msgr_init(void) int ceph_msgr_init(void)
{ {
ceph_msgr_wq = create_workqueue("ceph-msgr"); ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
if (!ceph_msgr_wq) { if (!ceph_msgr_wq) {
pr_err("msgr_init failed to create workqueue\n"); pr_err("msgr_init failed to create workqueue\n");
return -ENOMEM; return -ENOMEM;
...@@ -1920,20 +1920,6 @@ static int try_read(struct ceph_connection *con) ...@@ -1920,20 +1920,6 @@ static int try_read(struct ceph_connection *con)
/* /*
* Atomically queue work on a connection. Bump @con reference to * Atomically queue work on a connection. Bump @con reference to
* avoid races with connection teardown. * avoid races with connection teardown.
*
* There is some trickery going on with QUEUED and BUSY because we
* only want a _single_ thread operating on each connection at any
* point in time, but we want to use all available CPUs.
*
* The worker thread only proceeds if it can atomically set BUSY. It
* clears QUEUED and does it's thing. When it thinks it's done, it
* clears BUSY, then rechecks QUEUED.. if it's set again, it loops
* (tries again to set BUSY).
*
* To queue work, we first set QUEUED, _then_ if BUSY isn't set, we
* try to queue work. If that fails (work is already queued, or BUSY)
* we give up (work also already being done or is queued) but leave QUEUED
* set so that the worker thread will loop if necessary.
*/ */
static void queue_con(struct ceph_connection *con) static void queue_con(struct ceph_connection *con)
{ {
...@@ -1948,11 +1934,7 @@ static void queue_con(struct ceph_connection *con) ...@@ -1948,11 +1934,7 @@ static void queue_con(struct ceph_connection *con)
return; return;
} }
set_bit(QUEUED, &con->state); if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) {
if (test_bit(BUSY, &con->state)) {
dout("queue_con %p - already BUSY\n", con);
con->ops->put(con);
} else if (!queue_work(ceph_msgr_wq, &con->work.work)) {
dout("queue_con %p - already queued\n", con); dout("queue_con %p - already queued\n", con);
con->ops->put(con); con->ops->put(con);
} else { } else {
...@@ -1967,15 +1949,6 @@ static void con_work(struct work_struct *work) ...@@ -1967,15 +1949,6 @@ static void con_work(struct work_struct *work)
{ {
struct ceph_connection *con = container_of(work, struct ceph_connection, struct ceph_connection *con = container_of(work, struct ceph_connection,
work.work); work.work);
int backoff = 0;
more:
if (test_and_set_bit(BUSY, &con->state) != 0) {
dout("con_work %p BUSY already set\n", con);
goto out;
}
dout("con_work %p start, clearing QUEUED\n", con);
clear_bit(QUEUED, &con->state);
mutex_lock(&con->mutex); mutex_lock(&con->mutex);
...@@ -1994,28 +1967,13 @@ static void con_work(struct work_struct *work) ...@@ -1994,28 +1967,13 @@ static void con_work(struct work_struct *work)
try_read(con) < 0 || try_read(con) < 0 ||
try_write(con) < 0) { try_write(con) < 0) {
mutex_unlock(&con->mutex); mutex_unlock(&con->mutex);
backoff = 1;
ceph_fault(con); /* error/fault path */ ceph_fault(con); /* error/fault path */
goto done_unlocked; goto done_unlocked;
} }
done: done:
mutex_unlock(&con->mutex); mutex_unlock(&con->mutex);
done_unlocked: done_unlocked:
clear_bit(BUSY, &con->state);
dout("con->state=%lu\n", con->state);
if (test_bit(QUEUED, &con->state)) {
if (!backoff || test_bit(OPENING, &con->state)) {
dout("con_work %p QUEUED reset, looping\n", con);
goto more;
}
dout("con_work %p QUEUED reset, but just faulted\n", con);
clear_bit(QUEUED, &con->state);
}
dout("con_work %p done\n", con);
out:
con->ops->put(con); con->ops->put(con);
} }
......
...@@ -605,8 +605,10 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) ...@@ -605,8 +605,10 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
goto bad; goto bad;
} }
err = __decode_pool(p, end, pi); err = __decode_pool(p, end, pi);
if (err < 0) if (err < 0) {
kfree(pi);
goto bad; goto bad;
}
__insert_pg_pool(&map->pg_pools, pi); __insert_pg_pool(&map->pg_pools, pi);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册