diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 639dd91e7dab8c7244464dcc73e86f694350cba4..c34719c917b1cb8a710fc0b9be12c2e3bfa6ae1d 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -218,6 +218,7 @@ struct rbd_obj_request { u32 page_count; }; }; + struct page **copyup_pages; struct ceph_osd_request *osd_req; @@ -1498,7 +1499,7 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, obj_request->result = osd_req->r_result; obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version); - WARN_ON(osd_req->r_num_ops != 1); /* For now */ + BUG_ON(osd_req->r_num_ops > 2); /* * We support a 64-bit length, but ultimately it has to be @@ -1601,6 +1602,48 @@ static struct ceph_osd_request *rbd_osd_req_create( return osd_req; } +/* + * Create a copyup osd request based on the information in the + * object request supplied. A copyup request has two osd ops, + * a copyup method call, and a "normal" write request. + */ +static struct ceph_osd_request * +rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request; + struct ceph_snap_context *snapc; + struct rbd_device *rbd_dev; + struct ceph_osd_client *osdc; + struct ceph_osd_request *osd_req; + + rbd_assert(obj_request_img_data_test(obj_request)); + img_request = obj_request->img_request; + rbd_assert(img_request); + rbd_assert(img_request_write_test(img_request)); + + /* Allocate and initialize the request, for the two ops */ + + snapc = img_request->snapc; + rbd_dev = img_request->rbd_dev; + osdc = &rbd_dev->rbd_client->client->osdc; + osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC); + if (!osd_req) + return NULL; /* ENOMEM */ + + osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; + osd_req->r_callback = rbd_osd_req_callback; + osd_req->r_priv = obj_request; + + osd_req->r_oid_len = strlen(obj_request->object_name); + rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid)); + memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len); + + osd_req->r_file_layout = rbd_dev->layout; /* struct */ + + return osd_req; +} + + static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) { ceph_osdc_put_request(osd_req); @@ -1959,12 +2002,50 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, return -ENOMEM; } +static void +rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request; + struct rbd_device *rbd_dev; + u64 length; + u32 page_count; + + rbd_assert(obj_request->type == OBJ_REQUEST_BIO); + rbd_assert(obj_request_img_data_test(obj_request)); + img_request = obj_request->img_request; + rbd_assert(img_request); + + rbd_dev = img_request->rbd_dev; + rbd_assert(rbd_dev); + length = (u64)1 << rbd_dev->header.obj_order; + page_count = (u32)calc_pages_for(0, length); + + rbd_assert(obj_request->copyup_pages); + ceph_release_page_vector(obj_request->copyup_pages, page_count); + obj_request->copyup_pages = NULL; + + /* + * We want the transfer count to reflect the size of the + * original write request. There is no such thing as a + * successful short write, so if the request was successful + * we can just set it to the originally-requested length. + */ + if (!obj_request->result) + obj_request->xferred = obj_request->length; + + /* Finish up with the normal image object callback */ + + rbd_img_obj_callback(obj_request); +} + static void rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) { struct rbd_obj_request *orig_request; + struct ceph_osd_request *osd_req; + struct ceph_osd_client *osdc; + struct rbd_device *rbd_dev; struct page **pages; - u32 page_count; int result; u64 obj_size; u64 xferred; @@ -1979,25 +2060,60 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) orig_request = img_request->obj_request; rbd_assert(orig_request != NULL); - + rbd_assert(orig_request->type == OBJ_REQUEST_BIO); result = img_request->result; obj_size = img_request->length; xferred = img_request->xferred; + rbd_dev = img_request->rbd_dev; + rbd_assert(rbd_dev); + rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order); + rbd_img_request_put(img_request); - obj_request_existence_set(orig_request, true); + if (result) + goto out_err; + + /* Allocate the new copyup osd request for the original request */ - page_count = (u32)calc_pages_for(0, obj_size); - ceph_release_page_vector(pages, page_count); + result = -ENOMEM; + rbd_assert(!orig_request->osd_req); + osd_req = rbd_osd_req_create_copyup(orig_request); + if (!osd_req) + goto out_err; + orig_request->osd_req = osd_req; + orig_request->copyup_pages = pages; - /* Resubmit the original request (for now). */ + /* Initialize the copyup op */ - orig_request->result = rbd_img_obj_request_submit(orig_request); - if (orig_request->result) { - obj_request_done_set(orig_request); - rbd_obj_request_complete(orig_request); - } + osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup"); + osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0, + false, false); + + /* Then the original write request op */ + + osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE, + orig_request->offset, + orig_request->length, 0, 0); + osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list, + orig_request->length); + + rbd_osd_req_format_write(orig_request); + + /* All set, send it off. */ + + orig_request->callback = rbd_img_obj_copyup_callback; + osdc = &rbd_dev->rbd_client->client->osdc; + result = rbd_obj_request_submit(osdc, orig_request); + if (!result) + return; +out_err: + /* Record the error code and complete the request */ + + orig_request->result = result; + orig_request->xferred = 0; + obj_request_done_set(orig_request); + rbd_obj_request_complete(orig_request); } /* @@ -2033,6 +2149,15 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) rbd_dev = img_request->rbd_dev; rbd_assert(rbd_dev->parent != NULL); + /* + * First things first. The original osd request is of no + * use to use any more, we'll need a new one that can hold + * the two ops in a copyup request. We'll get that later, + * but for now we can release the old one. + */ + rbd_osd_req_destroy(obj_request->osd_req); + obj_request->osd_req = NULL; + /* * Determine the byte range covered by the object in the * child image to which the original request was to be sent.