提交 430afbad 编写于 作者: Y Yan, Zheng 提交者: Ilya Dryomov

ceph: mount non-default filesystem by name

To mount non-default filesytem, user currently needs to provide mds
namespace ID. This is inconvenience.

This patch makes user be able to mount filesystem by name. If user
wants to mount non-default filesystem. Client first subscribes to
fsmap.user. Subscribe to mdsmap.<ID> after getting ID of filesystem.
Signed-off-by: NYan, Zheng <zyan@redhat.com>
上级 0cabbd94
...@@ -2166,6 +2166,11 @@ static int __do_request(struct ceph_mds_client *mdsc, ...@@ -2166,6 +2166,11 @@ static int __do_request(struct ceph_mds_client *mdsc,
mds = __choose_mds(mdsc, req); mds = __choose_mds(mdsc, req);
if (mds < 0 || if (mds < 0 ||
ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
if (mdsc->mdsmap_err) {
err = mdsc->mdsmap_err;
dout("do_request mdsmap err %d\n", err);
goto finish;
}
dout("do_request no mds or not active, waiting for map\n"); dout("do_request no mds or not active, waiting for map\n");
list_add(&req->r_wait, &mdsc->waiting_for_map); list_add(&req->r_wait, &mdsc->waiting_for_map);
goto out; goto out;
...@@ -3683,11 +3688,86 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc) ...@@ -3683,11 +3688,86 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
dout("mdsc_destroy %p done\n", mdsc); dout("mdsc_destroy %p done\n", mdsc);
} }
void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
{
struct ceph_fs_client *fsc = mdsc->fsc;
const char *mds_namespace = fsc->mount_options->mds_namespace;
void *p = msg->front.iov_base;
void *end = p + msg->front.iov_len;
u32 epoch;
u32 map_len;
u32 num_fs;
u32 mount_fscid = (u32)-1;
u8 struct_v, struct_cv;
int err = -EINVAL;
ceph_decode_need(&p, end, sizeof(u32), bad);
epoch = ceph_decode_32(&p);
dout("handle_fsmap epoch %u\n", epoch);
ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
struct_v = ceph_decode_8(&p);
struct_cv = ceph_decode_8(&p);
map_len = ceph_decode_32(&p);
ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
num_fs = ceph_decode_32(&p);
while (num_fs-- > 0) {
void *info_p, *info_end;
u32 info_len;
u8 info_v, info_cv;
u32 fscid, namelen;
ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
info_v = ceph_decode_8(&p);
info_cv = ceph_decode_8(&p);
info_len = ceph_decode_32(&p);
ceph_decode_need(&p, end, info_len, bad);
info_p = p;
info_end = p + info_len;
p = info_end;
ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
fscid = ceph_decode_32(&info_p);
namelen = ceph_decode_32(&info_p);
ceph_decode_need(&info_p, info_end, namelen, bad);
if (mds_namespace &&
strlen(mds_namespace) == namelen &&
!strncmp(mds_namespace, (char *)info_p, namelen)) {
mount_fscid = fscid;
break;
}
}
ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
if (mount_fscid != (u32)-1) {
fsc->client->monc.fs_cluster_id = mount_fscid;
ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
0, true);
ceph_monc_renew_subs(&fsc->client->monc);
} else {
err = -ENOENT;
goto err_out;
}
return;
bad:
pr_err("error decoding fsmap\n");
err_out:
mutex_lock(&mdsc->mutex);
mdsc->mdsmap_err = -ENOENT;
__wake_requests(mdsc, &mdsc->waiting_for_map);
mutex_unlock(&mdsc->mutex);
return;
}
/* /*
* handle mds map update. * handle mds map update.
*/ */
void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg) void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
{ {
u32 epoch; u32 epoch;
u32 maplen; u32 maplen;
...@@ -3794,7 +3874,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) ...@@ -3794,7 +3874,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
switch (type) { switch (type) {
case CEPH_MSG_MDS_MAP: case CEPH_MSG_MDS_MAP:
ceph_mdsc_handle_map(mdsc, msg); ceph_mdsc_handle_mdsmap(mdsc, msg);
break;
case CEPH_MSG_FS_MAP_USER:
ceph_mdsc_handle_fsmap(mdsc, msg);
break; break;
case CEPH_MSG_CLIENT_SESSION: case CEPH_MSG_CLIENT_SESSION:
handle_session(s, msg); handle_session(s, msg);
......
...@@ -293,6 +293,7 @@ struct ceph_mds_client { ...@@ -293,6 +293,7 @@ struct ceph_mds_client {
struct completion safe_umount_waiters; struct completion safe_umount_waiters;
wait_queue_head_t session_close_wq; wait_queue_head_t session_close_wq;
struct list_head waiting_for_map; struct list_head waiting_for_map;
int mdsmap_err;
struct ceph_mds_session **sessions; /* NULL for mds if no session */ struct ceph_mds_session **sessions; /* NULL for mds if no session */
atomic_t num_sessions; atomic_t num_sessions;
...@@ -419,8 +420,10 @@ extern void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, ...@@ -419,8 +420,10 @@ extern void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
struct dentry *dentry, char action, struct dentry *dentry, char action,
u32 seq); u32 seq);
extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, extern void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc,
struct ceph_msg *msg); struct ceph_msg *msg);
extern void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc,
struct ceph_msg *msg);
extern struct ceph_mds_session * extern struct ceph_mds_session *
ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target); ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target);
......
...@@ -108,7 +108,6 @@ static int ceph_sync_fs(struct super_block *sb, int wait) ...@@ -108,7 +108,6 @@ static int ceph_sync_fs(struct super_block *sb, int wait)
* mount options * mount options
*/ */
enum { enum {
Opt_mds_namespace,
Opt_wsize, Opt_wsize,
Opt_rsize, Opt_rsize,
Opt_rasize, Opt_rasize,
...@@ -121,6 +120,7 @@ enum { ...@@ -121,6 +120,7 @@ enum {
Opt_last_int, Opt_last_int,
/* int args above */ /* int args above */
Opt_snapdirname, Opt_snapdirname,
Opt_mds_namespace,
Opt_last_string, Opt_last_string,
/* string args above */ /* string args above */
Opt_dirstat, Opt_dirstat,
...@@ -144,7 +144,6 @@ enum { ...@@ -144,7 +144,6 @@ enum {
}; };
static match_table_t fsopt_tokens = { static match_table_t fsopt_tokens = {
{Opt_mds_namespace, "mds_namespace=%d"},
{Opt_wsize, "wsize=%d"}, {Opt_wsize, "wsize=%d"},
{Opt_rsize, "rsize=%d"}, {Opt_rsize, "rsize=%d"},
{Opt_rasize, "rasize=%d"}, {Opt_rasize, "rasize=%d"},
...@@ -156,6 +155,7 @@ static match_table_t fsopt_tokens = { ...@@ -156,6 +155,7 @@ static match_table_t fsopt_tokens = {
{Opt_congestion_kb, "write_congestion_kb=%d"}, {Opt_congestion_kb, "write_congestion_kb=%d"},
/* int args above */ /* int args above */
{Opt_snapdirname, "snapdirname=%s"}, {Opt_snapdirname, "snapdirname=%s"},
{Opt_mds_namespace, "mds_namespace=%s"},
/* string args above */ /* string args above */
{Opt_dirstat, "dirstat"}, {Opt_dirstat, "dirstat"},
{Opt_nodirstat, "nodirstat"}, {Opt_nodirstat, "nodirstat"},
...@@ -212,11 +212,14 @@ static int parse_fsopt_token(char *c, void *private) ...@@ -212,11 +212,14 @@ static int parse_fsopt_token(char *c, void *private)
if (!fsopt->snapdir_name) if (!fsopt->snapdir_name)
return -ENOMEM; return -ENOMEM;
break; break;
/* misc */
case Opt_mds_namespace: case Opt_mds_namespace:
fsopt->mds_namespace = intval; fsopt->mds_namespace = kstrndup(argstr[0].from,
argstr[0].to-argstr[0].from,
GFP_KERNEL);
if (!fsopt->mds_namespace)
return -ENOMEM;
break; break;
/* misc */
case Opt_wsize: case Opt_wsize:
fsopt->wsize = intval; fsopt->wsize = intval;
break; break;
...@@ -302,6 +305,7 @@ static void destroy_mount_options(struct ceph_mount_options *args) ...@@ -302,6 +305,7 @@ static void destroy_mount_options(struct ceph_mount_options *args)
{ {
dout("destroy_mount_options %p\n", args); dout("destroy_mount_options %p\n", args);
kfree(args->snapdir_name); kfree(args->snapdir_name);
kfree(args->mds_namespace);
kfree(args->server_path); kfree(args->server_path);
kfree(args); kfree(args);
} }
...@@ -331,6 +335,9 @@ static int compare_mount_options(struct ceph_mount_options *new_fsopt, ...@@ -331,6 +335,9 @@ static int compare_mount_options(struct ceph_mount_options *new_fsopt,
return ret; return ret;
ret = strcmp_null(fsopt1->snapdir_name, fsopt2->snapdir_name); ret = strcmp_null(fsopt1->snapdir_name, fsopt2->snapdir_name);
if (ret)
return ret;
ret = strcmp_null(fsopt1->mds_namespace, fsopt2->mds_namespace);
if (ret) if (ret)
return ret; return ret;
...@@ -376,7 +383,6 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt, ...@@ -376,7 +383,6 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT; fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT;
fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT; fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
fsopt->congestion_kb = default_congestion_kb(); fsopt->congestion_kb = default_congestion_kb();
fsopt->mds_namespace = CEPH_FS_CLUSTER_ID_NONE;
/* /*
* Distinguish the server list from the path in "dev_name". * Distinguish the server list from the path in "dev_name".
...@@ -469,8 +475,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root) ...@@ -469,8 +475,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
seq_puts(m, ",noacl"); seq_puts(m, ",noacl");
#endif #endif
if (fsopt->mds_namespace != CEPH_FS_CLUSTER_ID_NONE) if (fsopt->mds_namespace)
seq_printf(m, ",mds_namespace=%d", fsopt->mds_namespace); seq_printf(m, ",mds_namespace=%s", fsopt->mds_namespace);
if (fsopt->wsize) if (fsopt->wsize)
seq_printf(m, ",wsize=%d", fsopt->wsize); seq_printf(m, ",wsize=%d", fsopt->wsize);
if (fsopt->rsize != CEPH_RSIZE_DEFAULT) if (fsopt->rsize != CEPH_RSIZE_DEFAULT)
...@@ -509,9 +515,11 @@ static int extra_mon_dispatch(struct ceph_client *client, struct ceph_msg *msg) ...@@ -509,9 +515,11 @@ static int extra_mon_dispatch(struct ceph_client *client, struct ceph_msg *msg)
switch (type) { switch (type) {
case CEPH_MSG_MDS_MAP: case CEPH_MSG_MDS_MAP:
ceph_mdsc_handle_map(fsc->mdsc, msg); ceph_mdsc_handle_mdsmap(fsc->mdsc, msg);
return 0;
case CEPH_MSG_FS_MAP_USER:
ceph_mdsc_handle_fsmap(fsc->mdsc, msg);
return 0; return 0;
default: default:
return -1; return -1;
} }
...@@ -543,8 +551,14 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, ...@@ -543,8 +551,14 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
goto fail; goto fail;
} }
fsc->client->extra_mon_dispatch = extra_mon_dispatch; fsc->client->extra_mon_dispatch = extra_mon_dispatch;
fsc->client->monc.fs_cluster_id = fsopt->mds_namespace;
ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 0, true); if (fsopt->mds_namespace == NULL) {
ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
0, true);
} else {
ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_FSMAP,
0, false);
}
fsc->mount_options = fsopt; fsc->mount_options = fsopt;
......
...@@ -62,7 +62,6 @@ struct ceph_mount_options { ...@@ -62,7 +62,6 @@ struct ceph_mount_options {
int cap_release_safety; int cap_release_safety;
int max_readdir; /* max readdir result (entires) */ int max_readdir; /* max readdir result (entires) */
int max_readdir_bytes; /* max readdir result (bytes) */ int max_readdir_bytes; /* max readdir result (bytes) */
int mds_namespace;
/* /*
* everything above this point can be memcmp'd; everything below * everything above this point can be memcmp'd; everything below
...@@ -70,6 +69,7 @@ struct ceph_mount_options { ...@@ -70,6 +69,7 @@ struct ceph_mount_options {
*/ */
char *snapdir_name; /* default ".snap" */ char *snapdir_name; /* default ".snap" */
char *mds_namespace; /* default NULL */
char *server_path; /* default "/" */ char *server_path; /* default "/" */
}; };
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册