提交 3d7efd18 编写于 作者: A Alex Elder 提交者: Sage Weil

rbd: implement full object parent reads

As a step toward implementing layered writes, implement reading the
data for a target object from the parent image for a write request
whose target object is known to not exist.  Add a copyup_pages field
to an image request to track the page array used (only) for such a
request.
Signed-off-by: NAlex Elder <elder@inktank.com>
Reviewed-by: NJosh Durgin <josh.durgin@inktank.com>
上级 d98df63e
......@@ -250,6 +250,7 @@ struct rbd_img_request {
struct request *rq; /* block request */
struct rbd_obj_request *obj_request; /* obj req initiator */
};
struct page **copyup_pages;
spinlock_t completion_lock;/* protects next_completion */
u32 next_completion;
rbd_img_callback_t callback;
......@@ -350,6 +351,8 @@ static DEFINE_SPINLOCK(rbd_dev_list_lock);
static LIST_HEAD(rbd_client_list); /* clients */
static DEFINE_SPINLOCK(rbd_client_list_lock);
static int rbd_img_request_submit(struct rbd_img_request *img_request);
static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
......@@ -1956,6 +1959,133 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
return -ENOMEM;
}
static void
rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
{
struct rbd_obj_request *orig_request;
struct page **pages;
u32 page_count;
int result;
u64 obj_size;
u64 xferred;
rbd_assert(img_request_child_test(img_request));
/* First get what we need from the image request */
pages = img_request->copyup_pages;
rbd_assert(pages != NULL);
img_request->copyup_pages = NULL;
orig_request = img_request->obj_request;
rbd_assert(orig_request != NULL);
result = img_request->result;
obj_size = img_request->length;
xferred = img_request->xferred;
rbd_img_request_put(img_request);
obj_request_existence_set(orig_request, true);
page_count = (u32)calc_pages_for(0, obj_size);
ceph_release_page_vector(pages, page_count);
/* Resubmit the original request (for now). */
orig_request->result = rbd_img_obj_request_submit(orig_request);
if (orig_request->result) {
obj_request_done_set(orig_request);
rbd_obj_request_complete(orig_request);
}
}
/*
* Read from the parent image the range of data that covers the
* entire target of the given object request. This is used for
* satisfying a layered image write request when the target of an
* object request from the image request does not exist.
*
* A page array big enough to hold the returned data is allocated
* and supplied to rbd_img_request_fill() as the "data descriptor."
* When the read completes, this page array will be transferred to
* the original object request for the copyup operation.
*
* If an error occurs, record it as the result of the original
* object request and mark it done so it gets completed.
*/
static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
{
struct rbd_img_request *img_request = NULL;
struct rbd_img_request *parent_request = NULL;
struct rbd_device *rbd_dev;
u64 img_offset;
u64 length;
struct page **pages = NULL;
u32 page_count;
int result;
rbd_assert(obj_request_img_data_test(obj_request));
rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
img_request = obj_request->img_request;
rbd_assert(img_request != NULL);
rbd_dev = img_request->rbd_dev;
rbd_assert(rbd_dev->parent != NULL);
/*
* Determine the byte range covered by the object in the
* child image to which the original request was to be sent.
*/
img_offset = obj_request->img_offset - obj_request->offset;
length = (u64)1 << rbd_dev->header.obj_order;
/*
* Allocate a page array big enough to receive the data read
* from the parent.
*/
page_count = (u32)calc_pages_for(0, length);
pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
if (IS_ERR(pages)) {
result = PTR_ERR(pages);
pages = NULL;
goto out_err;
}
result = -ENOMEM;
parent_request = rbd_img_request_create(rbd_dev->parent,
img_offset, length,
false, true);
if (!parent_request)
goto out_err;
rbd_obj_request_get(obj_request);
parent_request->obj_request = obj_request;
result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
if (result)
goto out_err;
parent_request->copyup_pages = pages;
parent_request->callback = rbd_img_obj_parent_read_full_callback;
result = rbd_img_request_submit(parent_request);
if (!result)
return 0;
parent_request->copyup_pages = NULL;
parent_request->obj_request = NULL;
rbd_obj_request_put(obj_request);
out_err:
if (pages)
ceph_release_page_vector(pages, page_count);
if (parent_request)
rbd_img_request_put(parent_request);
obj_request->result = result;
obj_request->xferred = 0;
obj_request_done_set(obj_request);
return result;
}
static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
{
struct rbd_obj_request *orig_request;
......@@ -1996,7 +2126,7 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
obj_request_existence_set(orig_request, false);
} else if (result) {
orig_request->result = result;
goto out_err;
goto out;
}
/*
......@@ -2004,7 +2134,7 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
* whether the target object exists.
*/
orig_request->result = rbd_img_obj_request_submit(orig_request);
out_err:
out:
if (orig_request->result)
rbd_obj_request_complete(orig_request);
rbd_obj_request_put(orig_request);
......@@ -2070,15 +2200,13 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
{
struct rbd_img_request *img_request;
bool known;
rbd_assert(obj_request_img_data_test(obj_request));
img_request = obj_request->img_request;
rbd_assert(img_request);
/* (At the moment we don't care whether it exists or not...) */
(void) obj_request_exists_test;
/*
* Only layered writes need special handling. If it's not a
* layered write, or it is a layered write but we know the
......@@ -2087,7 +2215,8 @@ static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
*/
if (!img_request_write_test(img_request) ||
!img_request_layered_test(img_request) ||
obj_request_known_test(obj_request)) {
((known = obj_request_known_test(obj_request)) &&
obj_request_exists_test(obj_request))) {
struct rbd_device *rbd_dev;
struct ceph_osd_client *osdc;
......@@ -2099,10 +2228,15 @@ static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
}
/*
* It's a layered write and we don't know whether the target
* exists. Issue existence check; once that completes the
* original request will be submitted again.
* It's a layered write. The target object might exist but
* we may not know that yet. If we know it doesn't exist,
* start by reading the data for the full target object from
* the parent so we can use it for a copyup to the target.
*/
if (known)
return rbd_img_obj_parent_read_full(obj_request);
/* We don't know whether the target exists. Go find out. */
return rbd_img_obj_exists_submit(obj_request);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册