diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index b2819deced6b43c36b49f08d39b9cf8ce58a095f..639dd91e7dab8c7244464dcc73e86f694350cba4 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -250,6 +250,7 @@ struct rbd_img_request { struct request *rq; /* block request */ struct rbd_obj_request *obj_request; /* obj req initiator */ }; + struct page **copyup_pages; spinlock_t completion_lock;/* protects next_completion */ u32 next_completion; rbd_img_callback_t callback; @@ -350,6 +351,8 @@ static DEFINE_SPINLOCK(rbd_dev_list_lock); static LIST_HEAD(rbd_client_list); /* clients */ static DEFINE_SPINLOCK(rbd_client_list_lock); +static int rbd_img_request_submit(struct rbd_img_request *img_request); + static int rbd_dev_snaps_update(struct rbd_device *rbd_dev); static int rbd_dev_snaps_register(struct rbd_device *rbd_dev); @@ -1956,6 +1959,133 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, return -ENOMEM; } +static void +rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) +{ + struct rbd_obj_request *orig_request; + struct page **pages; + u32 page_count; + int result; + u64 obj_size; + u64 xferred; + + rbd_assert(img_request_child_test(img_request)); + + /* First get what we need from the image request */ + + pages = img_request->copyup_pages; + rbd_assert(pages != NULL); + img_request->copyup_pages = NULL; + + orig_request = img_request->obj_request; + rbd_assert(orig_request != NULL); + + result = img_request->result; + obj_size = img_request->length; + xferred = img_request->xferred; + + rbd_img_request_put(img_request); + + obj_request_existence_set(orig_request, true); + + page_count = (u32)calc_pages_for(0, obj_size); + ceph_release_page_vector(pages, page_count); + + /* Resubmit the original request (for now). */ + + orig_request->result = rbd_img_obj_request_submit(orig_request); + if (orig_request->result) { + obj_request_done_set(orig_request); + rbd_obj_request_complete(orig_request); + } +} + +/* + * Read from the parent image the range of data that covers the + * entire target of the given object request. This is used for + * satisfying a layered image write request when the target of an + * object request from the image request does not exist. + * + * A page array big enough to hold the returned data is allocated + * and supplied to rbd_img_request_fill() as the "data descriptor." + * When the read completes, this page array will be transferred to + * the original object request for the copyup operation. + * + * If an error occurs, record it as the result of the original + * object request and mark it done so it gets completed. + */ +static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request = NULL; + struct rbd_img_request *parent_request = NULL; + struct rbd_device *rbd_dev; + u64 img_offset; + u64 length; + struct page **pages = NULL; + u32 page_count; + int result; + + rbd_assert(obj_request_img_data_test(obj_request)); + rbd_assert(obj_request->type == OBJ_REQUEST_BIO); + + img_request = obj_request->img_request; + rbd_assert(img_request != NULL); + rbd_dev = img_request->rbd_dev; + rbd_assert(rbd_dev->parent != NULL); + + /* + * Determine the byte range covered by the object in the + * child image to which the original request was to be sent. + */ + img_offset = obj_request->img_offset - obj_request->offset; + length = (u64)1 << rbd_dev->header.obj_order; + + /* + * Allocate a page array big enough to receive the data read + * from the parent. + */ + page_count = (u32)calc_pages_for(0, length); + pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); + if (IS_ERR(pages)) { + result = PTR_ERR(pages); + pages = NULL; + goto out_err; + } + + result = -ENOMEM; + parent_request = rbd_img_request_create(rbd_dev->parent, + img_offset, length, + false, true); + if (!parent_request) + goto out_err; + rbd_obj_request_get(obj_request); + parent_request->obj_request = obj_request; + + result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages); + if (result) + goto out_err; + parent_request->copyup_pages = pages; + + parent_request->callback = rbd_img_obj_parent_read_full_callback; + result = rbd_img_request_submit(parent_request); + if (!result) + return 0; + + parent_request->copyup_pages = NULL; + parent_request->obj_request = NULL; + rbd_obj_request_put(obj_request); +out_err: + if (pages) + ceph_release_page_vector(pages, page_count); + if (parent_request) + rbd_img_request_put(parent_request); + obj_request->result = result; + obj_request->xferred = 0; + obj_request_done_set(obj_request); + + return result; +} + static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) { struct rbd_obj_request *orig_request; @@ -1996,7 +2126,7 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) obj_request_existence_set(orig_request, false); } else if (result) { orig_request->result = result; - goto out_err; + goto out; } /* @@ -2004,7 +2134,7 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) * whether the target object exists. */ orig_request->result = rbd_img_obj_request_submit(orig_request); -out_err: +out: if (orig_request->result) rbd_obj_request_complete(orig_request); rbd_obj_request_put(orig_request); @@ -2070,15 +2200,13 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) { struct rbd_img_request *img_request; + bool known; rbd_assert(obj_request_img_data_test(obj_request)); img_request = obj_request->img_request; rbd_assert(img_request); - /* (At the moment we don't care whether it exists or not...) */ - (void) obj_request_exists_test; - /* * Only layered writes need special handling. If it's not a * layered write, or it is a layered write but we know the @@ -2087,7 +2215,8 @@ static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) */ if (!img_request_write_test(img_request) || !img_request_layered_test(img_request) || - obj_request_known_test(obj_request)) { + ((known = obj_request_known_test(obj_request)) && + obj_request_exists_test(obj_request))) { struct rbd_device *rbd_dev; struct ceph_osd_client *osdc; @@ -2099,10 +2228,15 @@ static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) } /* - * It's a layered write and we don't know whether the target - * exists. Issue existence check; once that completes the - * original request will be submitted again. + * It's a layered write. The target object might exist but + * we may not know that yet. If we know it doesn't exist, + * start by reading the data for the full target object from + * the parent so we can use it for a copyup to the target. */ + if (known) + return rbd_img_obj_parent_read_full(obj_request); + + /* We don't know whether the target exists. Go find out. */ return rbd_img_obj_exists_submit(obj_request); }