提交 0eefd470 编写于 作者: A Alex Elder 提交者: Sage Weil

rbd: issue a copyup for layered writes

This implements the main copyup functionality for layered writes.

Here we add a copyup_pages field to the object request, which is
used only for copyup requests to keep track of the page array
containing data read from the parent image.

A copyup request is currently the only request rbd has that requires
two osd operations.  Because of this we handle copyup specially.
All image object requests get an osd request allocated when they are
created.  For a write request, if a copyup is required, the osd
request originally allocated is released, and a new one (with room
for two osd ops) is allocated to replace it.  A new function
rbd_osd_req_create_copyup() allocates an osd request suitable for
a copyup request.

The first op is then filled with a copyup object class method call,
supplying the array of pages containing data read from the parent.
The second op is filled in with the original write request.

The original request otherwise remains intact, and it describes the
original write request (found in the second osd op).  The presence
of the copyup op is sort of implicit; a non-null copyup_pages field
could be used to distinguish between a "normal" write request and a
request containing both a copyup call and a write.

This resolves:
    http://tracker.ceph.com/issues/3419Signed-off-by: NAlex Elder <elder@inktank.com>
Reviewed-by: NJosh Durgin <josh.durgin@inktank.com>
上级 3d7efd18
...@@ -218,6 +218,7 @@ struct rbd_obj_request { ...@@ -218,6 +218,7 @@ struct rbd_obj_request {
u32 page_count; u32 page_count;
}; };
}; };
struct page **copyup_pages;
struct ceph_osd_request *osd_req; struct ceph_osd_request *osd_req;
...@@ -1498,7 +1499,7 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, ...@@ -1498,7 +1499,7 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
obj_request->result = osd_req->r_result; obj_request->result = osd_req->r_result;
obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version); obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
WARN_ON(osd_req->r_num_ops != 1); /* For now */ BUG_ON(osd_req->r_num_ops > 2);
/* /*
* We support a 64-bit length, but ultimately it has to be * We support a 64-bit length, but ultimately it has to be
...@@ -1601,6 +1602,48 @@ static struct ceph_osd_request *rbd_osd_req_create( ...@@ -1601,6 +1602,48 @@ static struct ceph_osd_request *rbd_osd_req_create(
return osd_req; return osd_req;
} }
/*
* Create a copyup osd request based on the information in the
* object request supplied. A copyup request has two osd ops,
* a copyup method call, and a "normal" write request.
*/
static struct ceph_osd_request *
rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
{
struct rbd_img_request *img_request;
struct ceph_snap_context *snapc;
struct rbd_device *rbd_dev;
struct ceph_osd_client *osdc;
struct ceph_osd_request *osd_req;
rbd_assert(obj_request_img_data_test(obj_request));
img_request = obj_request->img_request;
rbd_assert(img_request);
rbd_assert(img_request_write_test(img_request));
/* Allocate and initialize the request, for the two ops */
snapc = img_request->snapc;
rbd_dev = img_request->rbd_dev;
osdc = &rbd_dev->rbd_client->client->osdc;
osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
if (!osd_req)
return NULL; /* ENOMEM */
osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
osd_req->r_callback = rbd_osd_req_callback;
osd_req->r_priv = obj_request;
osd_req->r_oid_len = strlen(obj_request->object_name);
rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
osd_req->r_file_layout = rbd_dev->layout; /* struct */
return osd_req;
}
static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
{ {
ceph_osdc_put_request(osd_req); ceph_osdc_put_request(osd_req);
...@@ -1959,12 +2002,50 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, ...@@ -1959,12 +2002,50 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
return -ENOMEM; return -ENOMEM;
} }
static void
rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
{
struct rbd_img_request *img_request;
struct rbd_device *rbd_dev;
u64 length;
u32 page_count;
rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
rbd_assert(obj_request_img_data_test(obj_request));
img_request = obj_request->img_request;
rbd_assert(img_request);
rbd_dev = img_request->rbd_dev;
rbd_assert(rbd_dev);
length = (u64)1 << rbd_dev->header.obj_order;
page_count = (u32)calc_pages_for(0, length);
rbd_assert(obj_request->copyup_pages);
ceph_release_page_vector(obj_request->copyup_pages, page_count);
obj_request->copyup_pages = NULL;
/*
* We want the transfer count to reflect the size of the
* original write request. There is no such thing as a
* successful short write, so if the request was successful
* we can just set it to the originally-requested length.
*/
if (!obj_request->result)
obj_request->xferred = obj_request->length;
/* Finish up with the normal image object callback */
rbd_img_obj_callback(obj_request);
}
static void static void
rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
{ {
struct rbd_obj_request *orig_request; struct rbd_obj_request *orig_request;
struct ceph_osd_request *osd_req;
struct ceph_osd_client *osdc;
struct rbd_device *rbd_dev;
struct page **pages; struct page **pages;
u32 page_count;
int result; int result;
u64 obj_size; u64 obj_size;
u64 xferred; u64 xferred;
...@@ -1979,25 +2060,60 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) ...@@ -1979,25 +2060,60 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
orig_request = img_request->obj_request; orig_request = img_request->obj_request;
rbd_assert(orig_request != NULL); rbd_assert(orig_request != NULL);
rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
result = img_request->result; result = img_request->result;
obj_size = img_request->length; obj_size = img_request->length;
xferred = img_request->xferred; xferred = img_request->xferred;
rbd_dev = img_request->rbd_dev;
rbd_assert(rbd_dev);
rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
rbd_img_request_put(img_request); rbd_img_request_put(img_request);
obj_request_existence_set(orig_request, true); if (result)
goto out_err;
/* Allocate the new copyup osd request for the original request */
page_count = (u32)calc_pages_for(0, obj_size); result = -ENOMEM;
ceph_release_page_vector(pages, page_count); rbd_assert(!orig_request->osd_req);
osd_req = rbd_osd_req_create_copyup(orig_request);
if (!osd_req)
goto out_err;
orig_request->osd_req = osd_req;
orig_request->copyup_pages = pages;
/* Resubmit the original request (for now). */ /* Initialize the copyup op */
orig_request->result = rbd_img_obj_request_submit(orig_request); osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
if (orig_request->result) { osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
obj_request_done_set(orig_request); false, false);
rbd_obj_request_complete(orig_request);
} /* Then the original write request op */
osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
orig_request->offset,
orig_request->length, 0, 0);
osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
orig_request->length);
rbd_osd_req_format_write(orig_request);
/* All set, send it off. */
orig_request->callback = rbd_img_obj_copyup_callback;
osdc = &rbd_dev->rbd_client->client->osdc;
result = rbd_obj_request_submit(osdc, orig_request);
if (!result)
return;
out_err:
/* Record the error code and complete the request */
orig_request->result = result;
orig_request->xferred = 0;
obj_request_done_set(orig_request);
rbd_obj_request_complete(orig_request);
} }
/* /*
...@@ -2033,6 +2149,15 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) ...@@ -2033,6 +2149,15 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
rbd_dev = img_request->rbd_dev; rbd_dev = img_request->rbd_dev;
rbd_assert(rbd_dev->parent != NULL); rbd_assert(rbd_dev->parent != NULL);
/*
* First things first. The original osd request is of no
* use to use any more, we'll need a new one that can hold
* the two ops in a copyup request. We'll get that later,
* but for now we can release the old one.
*/
rbd_osd_req_destroy(obj_request->osd_req);
obj_request->osd_req = NULL;
/* /*
* Determine the byte range covered by the object in the * Determine the byte range covered by the object in the
* child image to which the original request was to be sent. * child image to which the original request was to be sent.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册