提交 205ee118 编写于 作者: I Ilya Dryomov

libceph: follow redirect replies from osds

Follow redirect replies from osds, for details see ceph.git commit
fbbe3ad1220799b7bb00ea30fce581c5eadaf034.

v1 (current) version of redirect reply consists of oloc and oid, which
expands to pool, key, nspace, hash and oid.  However, server-side code
that would populate anything other than pool doesn't exist yet, and
hence this commit adds support for pool redirects only.  To make sure
that future server-side updates don't break us, we decode all fields
and, if any of key, nspace, hash or oid have a non-default value, error
out with "corrupt osd_op_reply ..." message.
Signed-off-by: NIlya Dryomov <ilya.dryomov@inktank.com>
Reviewed-by: NSage Weil <sage@inktank.com>
上级 3c972c95
......@@ -155,6 +155,8 @@ struct ceph_osd_request {
struct ceph_object_locator r_base_oloc;
struct ceph_object_id r_base_oid;
struct ceph_object_locator r_target_oloc;
struct ceph_object_id r_target_oid;
u64 r_snapid;
unsigned long r_stamp; /* send OR check time */
......@@ -162,6 +164,10 @@ struct ceph_osd_request {
struct ceph_snap_context *r_snapc; /* snap context for writes */
};
struct ceph_request_redirect {
struct ceph_object_locator oloc;
};
struct ceph_osd_event {
u64 cookie;
int one_shot;
......
......@@ -369,6 +369,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
INIT_LIST_HEAD(&req->r_osd_item);
req->r_base_oloc.pool = -1;
req->r_target_oloc.pool = -1;
/* create reply message */
if (use_mempool)
......@@ -1256,23 +1257,36 @@ static int __calc_request_pg(struct ceph_osdmap *osdmap,
struct ceph_osd_request *req,
struct ceph_pg *pg_out)
{
if ((req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
bool need_check_tiering;
need_check_tiering = false;
if (req->r_target_oloc.pool == -1) {
req->r_target_oloc = req->r_base_oloc; /* struct */
need_check_tiering = true;
}
if (req->r_target_oid.name_len == 0) {
ceph_oid_copy(&req->r_target_oid, &req->r_base_oid);
need_check_tiering = true;
}
if (need_check_tiering &&
(req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
struct ceph_pg_pool_info *pi;
pi = ceph_pg_pool_by_id(osdmap, req->r_base_oloc.pool);
pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool);
if (pi) {
if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
pi->read_tier >= 0)
req->r_base_oloc.pool = pi->read_tier;
req->r_target_oloc.pool = pi->read_tier;
if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
pi->write_tier >= 0)
req->r_base_oloc.pool = pi->write_tier;
req->r_target_oloc.pool = pi->write_tier;
}
/* !pi is caught in ceph_oloc_oid_to_pg() */
}
return ceph_oloc_oid_to_pg(osdmap, &req->r_base_oloc,
&req->r_base_oid, pg_out);
return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc,
&req->r_target_oid, pg_out);
}
/*
......@@ -1382,7 +1396,7 @@ static void __send_request(struct ceph_osd_client *osdc,
/* fill in message content that changes each time we send it */
put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
put_unaligned_le32(req->r_flags, req->r_request_flags);
put_unaligned_le64(req->r_base_oloc.pool, req->r_request_pool);
put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool);
p = req->r_request_pgid;
ceph_encode_64(&p, req->r_pgid.pool);
ceph_encode_32(&p, req->r_pgid.seed);
......@@ -1483,6 +1497,109 @@ static void handle_osds_timeout(struct work_struct *work)
round_jiffies_relative(delay));
}
static int ceph_oloc_decode(void **p, void *end,
struct ceph_object_locator *oloc)
{
u8 struct_v, struct_cv;
u32 len;
void *struct_end;
int ret = 0;
ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
struct_v = ceph_decode_8(p);
struct_cv = ceph_decode_8(p);
if (struct_v < 3) {
pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
struct_v, struct_cv);
goto e_inval;
}
if (struct_cv > 6) {
pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
struct_v, struct_cv);
goto e_inval;
}
len = ceph_decode_32(p);
ceph_decode_need(p, end, len, e_inval);
struct_end = *p + len;
oloc->pool = ceph_decode_64(p);
*p += 4; /* skip preferred */
len = ceph_decode_32(p);
if (len > 0) {
pr_warn("ceph_object_locator::key is set\n");
goto e_inval;
}
if (struct_v >= 5) {
len = ceph_decode_32(p);
if (len > 0) {
pr_warn("ceph_object_locator::nspace is set\n");
goto e_inval;
}
}
if (struct_v >= 6) {
s64 hash = ceph_decode_64(p);
if (hash != -1) {
pr_warn("ceph_object_locator::hash is set\n");
goto e_inval;
}
}
/* skip the rest */
*p = struct_end;
out:
return ret;
e_inval:
ret = -EINVAL;
goto out;
}
static int ceph_redirect_decode(void **p, void *end,
struct ceph_request_redirect *redir)
{
u8 struct_v, struct_cv;
u32 len;
void *struct_end;
int ret;
ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
struct_v = ceph_decode_8(p);
struct_cv = ceph_decode_8(p);
if (struct_cv > 1) {
pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
struct_v, struct_cv);
goto e_inval;
}
len = ceph_decode_32(p);
ceph_decode_need(p, end, len, e_inval);
struct_end = *p + len;
ret = ceph_oloc_decode(p, end, &redir->oloc);
if (ret)
goto out;
len = ceph_decode_32(p);
if (len > 0) {
pr_warn("ceph_request_redirect::object_name is set\n");
goto e_inval;
}
len = ceph_decode_32(p);
*p += len; /* skip osd_instructions */
/* skip the rest */
*p = struct_end;
out:
return ret;
e_inval:
ret = -EINVAL;
goto out;
}
static void complete_request(struct ceph_osd_request *req)
{
complete_all(&req->r_safe_completion); /* fsync waiter */
......@@ -1497,6 +1614,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
{
void *p, *end;
struct ceph_osd_request *req;
struct ceph_request_redirect redir;
u64 tid;
int object_len;
unsigned int numops;
......@@ -1576,10 +1694,41 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
for (i = 0; i < numops; i++)
req->r_reply_op_result[i] = ceph_decode_32(&p);
already_completed = req->r_got_reply;
if (le16_to_cpu(msg->hdr.version) >= 6) {
p += 8 + 4; /* skip replay_version */
p += 8; /* skip user_version */
if (!req->r_got_reply) {
err = ceph_redirect_decode(&p, end, &redir);
if (err)
goto bad_put;
} else {
redir.oloc.pool = -1;
}
if (redir.oloc.pool != -1) {
dout("redirect pool %lld\n", redir.oloc.pool);
__unregister_request(osdc, req);
mutex_unlock(&osdc->request_mutex);
req->r_target_oloc = redir.oloc; /* struct */
/*
* Start redirect requests with nofail=true. If
* mapping fails, request will end up on the notarget
* list, waiting for the new osdmap (which can take
* a while), even though the original request mapped
* successfully. In the future we might want to follow
* original request's nofail setting here.
*/
err = ceph_osdc_start_request(osdc, req, true);
BUG_ON(err);
goto done;
}
already_completed = req->r_got_reply;
if (!req->r_got_reply) {
req->r_result = result;
dout("handle_reply result %d bytes %d\n", req->r_result,
bytes);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册