diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index a1bb8f3100a886ec00ccfab585ef40c62cc76992..6d1df82eb88374823d99a3610ed194096bff3200 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -312,6 +312,7 @@ enum img_req_flags { enum rbd_img_state { RBD_IMG_START = 1, + RBD_IMG_EXCLUSIVE_LOCK, __RBD_IMG_OBJECT_REQUESTS, RBD_IMG_OBJECT_REQUESTS, }; @@ -412,9 +413,11 @@ struct rbd_device { struct delayed_work lock_dwork; struct work_struct unlock_work; spinlock_t lock_lists_lock; + struct list_head acquiring_list; struct list_head running_list; + struct completion acquire_wait; + int acquire_err; struct completion releasing_wait; - wait_queue_head_t lock_waitq; struct workqueue_struct *task_wq; @@ -442,12 +445,10 @@ struct rbd_device { * Flag bits for rbd_dev->flags: * - REMOVING (which is coupled with rbd_dev->open_count) is protected * by rbd_dev->lock - * - BLACKLISTED is protected by rbd_dev->lock_rwsem */ enum rbd_dev_flags { RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */ RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ - RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */ }; static DEFINE_MUTEX(client_mutex); /* Serialize client creation */ @@ -500,6 +501,8 @@ static int minor_to_rbd_dev_id(int minor) static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev) { + lockdep_assert_held(&rbd_dev->lock_rwsem); + return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED || rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING; } @@ -2895,15 +2898,21 @@ static bool need_exclusive_lock(struct rbd_img_request *img_req) return rbd_img_is_write(img_req); } -static void rbd_lock_add_request(struct rbd_img_request *img_req) +static bool rbd_lock_add_request(struct rbd_img_request *img_req) { struct rbd_device *rbd_dev = img_req->rbd_dev; + bool locked; lockdep_assert_held(&rbd_dev->lock_rwsem); + locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED; spin_lock(&rbd_dev->lock_lists_lock); rbd_assert(list_empty(&img_req->lock_item)); - list_add_tail(&img_req->lock_item, &rbd_dev->running_list); + if (!locked) + list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list); + else + list_add_tail(&img_req->lock_item, &rbd_dev->running_list); spin_unlock(&rbd_dev->lock_lists_lock); + return locked; } static void rbd_lock_del_request(struct rbd_img_request *img_req) @@ -2922,6 +2931,30 @@ static void rbd_lock_del_request(struct rbd_img_request *img_req) complete(&rbd_dev->releasing_wait); } +static int rbd_img_exclusive_lock(struct rbd_img_request *img_req) +{ + struct rbd_device *rbd_dev = img_req->rbd_dev; + + if (!need_exclusive_lock(img_req)) + return 1; + + if (rbd_lock_add_request(img_req)) + return 1; + + if (rbd_dev->opts->exclusive) { + WARN_ON(1); /* lock got released? */ + return -EROFS; + } + + /* + * Note the use of mod_delayed_work() in rbd_acquire_lock() + * and cancel_delayed_work() in wake_lock_waiters(). + */ + dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev); + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); + return 0; +} + static void rbd_img_object_requests(struct rbd_img_request *img_req) { struct rbd_obj_request *obj_req; @@ -2944,11 +2977,30 @@ static void rbd_img_object_requests(struct rbd_img_request *img_req) static bool rbd_img_advance(struct rbd_img_request *img_req, int *result) { + struct rbd_device *rbd_dev = img_req->rbd_dev; + int ret; + again: switch (img_req->state) { case RBD_IMG_START: rbd_assert(!*result); + ret = rbd_img_exclusive_lock(img_req); + if (ret < 0) { + *result = ret; + return true; + } + img_req->state = RBD_IMG_EXCLUSIVE_LOCK; + if (ret > 0) + goto again; + return false; + case RBD_IMG_EXCLUSIVE_LOCK: + if (*result) + return true; + + rbd_assert(!need_exclusive_lock(img_req) || + __rbd_is_lock_owner(rbd_dev)); + rbd_img_object_requests(img_req); if (!img_req->pending.num_pending) { *result = img_req->pending.result; @@ -3107,7 +3159,7 @@ static void rbd_unlock(struct rbd_device *rbd_dev) ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, rbd_dev->lock_cookie); if (ret && ret != -ENOENT) - rbd_warn(rbd_dev, "failed to unlock: %d", ret); + rbd_warn(rbd_dev, "failed to unlock header: %d", ret); /* treat errors as the image is unlocked */ rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; @@ -3234,15 +3286,34 @@ static int rbd_request_lock(struct rbd_device *rbd_dev) goto out; } -static void wake_requests(struct rbd_device *rbd_dev, bool wake_all) +/* + * Either image request state machine(s) or rbd_add_acquire_lock() + * (i.e. "rbd map"). + */ +static void wake_lock_waiters(struct rbd_device *rbd_dev, int result) { - dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all); + struct rbd_img_request *img_req; + + dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result); + lockdep_assert_held_exclusive(&rbd_dev->lock_rwsem); cancel_delayed_work(&rbd_dev->lock_dwork); - if (wake_all) - wake_up_all(&rbd_dev->lock_waitq); - else - wake_up(&rbd_dev->lock_waitq); + if (!completion_done(&rbd_dev->acquire_wait)) { + rbd_assert(list_empty(&rbd_dev->acquiring_list) && + list_empty(&rbd_dev->running_list)); + rbd_dev->acquire_err = result; + complete_all(&rbd_dev->acquire_wait); + return; + } + + list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) { + mutex_lock(&img_req->state_mutex); + rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK); + rbd_img_schedule(img_req, result); + mutex_unlock(&img_req->state_mutex); + } + + list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list); } static int get_lock_owner_info(struct rbd_device *rbd_dev, @@ -3357,11 +3428,8 @@ static int rbd_try_lock(struct rbd_device *rbd_dev) goto again; ret = find_watcher(rbd_dev, lockers); - if (ret) { - if (ret > 0) - ret = 0; /* have to request lock */ - goto out; - } + if (ret) + goto out; /* request lock or error */ rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock", ENTITY_NAME(lockers[0].id.name)); @@ -3391,52 +3459,65 @@ static int rbd_try_lock(struct rbd_device *rbd_dev) } /* - * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED + * Return: + * 0 - lock acquired + * 1 - caller should call rbd_request_lock() + * <0 - error */ -static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev, - int *pret) +static int rbd_try_acquire_lock(struct rbd_device *rbd_dev) { - enum rbd_lock_state lock_state; + int ret; down_read(&rbd_dev->lock_rwsem); dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, rbd_dev->lock_state); if (__rbd_is_lock_owner(rbd_dev)) { - lock_state = rbd_dev->lock_state; up_read(&rbd_dev->lock_rwsem); - return lock_state; + return 0; } up_read(&rbd_dev->lock_rwsem); down_write(&rbd_dev->lock_rwsem); dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, rbd_dev->lock_state); - if (!__rbd_is_lock_owner(rbd_dev)) { - *pret = rbd_try_lock(rbd_dev); - if (*pret) - rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret); + if (__rbd_is_lock_owner(rbd_dev)) { + up_write(&rbd_dev->lock_rwsem); + return 0; } - lock_state = rbd_dev->lock_state; + ret = rbd_try_lock(rbd_dev); + if (ret < 0) { + rbd_warn(rbd_dev, "failed to lock header: %d", ret); + if (ret == -EBLACKLISTED) + goto out; + + ret = 1; /* request lock anyway */ + } + if (ret > 0) { + up_write(&rbd_dev->lock_rwsem); + return ret; + } + + rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED); + rbd_assert(list_empty(&rbd_dev->running_list)); + +out: + wake_lock_waiters(rbd_dev, ret); up_write(&rbd_dev->lock_rwsem); - return lock_state; + return ret; } static void rbd_acquire_lock(struct work_struct *work) { struct rbd_device *rbd_dev = container_of(to_delayed_work(work), struct rbd_device, lock_dwork); - enum rbd_lock_state lock_state; - int ret = 0; + int ret; dout("%s rbd_dev %p\n", __func__, rbd_dev); again: - lock_state = rbd_try_acquire_lock(rbd_dev, &ret); - if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) { - if (lock_state == RBD_LOCK_STATE_LOCKED) - wake_requests(rbd_dev, true); - dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__, - rbd_dev, lock_state, ret); + ret = rbd_try_acquire_lock(rbd_dev); + if (ret <= 0) { + dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret); return; } @@ -3445,16 +3526,9 @@ static void rbd_acquire_lock(struct work_struct *work) goto again; /* treat this as a dead client */ } else if (ret == -EROFS) { rbd_warn(rbd_dev, "peer will not release lock"); - /* - * If this is rbd_add_acquire_lock(), we want to fail - * immediately -- reuse BLACKLISTED flag. Otherwise we - * want to block. - */ - if (!(rbd_dev->disk->flags & GENHD_FL_UP)) { - set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); - /* wake "rbd map --exclusive" process */ - wake_requests(rbd_dev, false); - } + down_write(&rbd_dev->lock_rwsem); + wake_lock_waiters(rbd_dev, ret); + up_write(&rbd_dev->lock_rwsem); } else if (ret < 0) { rbd_warn(rbd_dev, "error requesting lock: %d", ret); mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, @@ -3519,10 +3593,10 @@ static void rbd_release_lock(struct rbd_device *rbd_dev) /* * Give others a chance to grab the lock - we would re-acquire - * almost immediately if we got new IO during ceph_osdc_sync() - * otherwise. We need to ack our own notifications, so this - * lock_dwork will be requeued from rbd_wait_state_locked() - * after wake_requests() in rbd_handle_released_lock(). + * almost immediately if we got new IO while draining the running + * list otherwise. We need to ack our own notifications, so this + * lock_dwork will be requeued from rbd_handle_released_lock() by + * way of maybe_kick_acquire(). */ cancel_delayed_work(&rbd_dev->lock_dwork); } @@ -3537,6 +3611,23 @@ static void rbd_release_lock_work(struct work_struct *work) up_write(&rbd_dev->lock_rwsem); } +static void maybe_kick_acquire(struct rbd_device *rbd_dev) +{ + bool have_requests; + + dout("%s rbd_dev %p\n", __func__, rbd_dev); + if (__rbd_is_lock_owner(rbd_dev)) + return; + + spin_lock(&rbd_dev->lock_lists_lock); + have_requests = !list_empty(&rbd_dev->acquiring_list); + spin_unlock(&rbd_dev->lock_lists_lock); + if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) { + dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev); + mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); + } +} + static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v, void **p) { @@ -3566,8 +3657,7 @@ static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v, down_read(&rbd_dev->lock_rwsem); } - if (!__rbd_is_lock_owner(rbd_dev)) - wake_requests(rbd_dev, false); + maybe_kick_acquire(rbd_dev); up_read(&rbd_dev->lock_rwsem); } @@ -3599,8 +3689,7 @@ static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v, down_read(&rbd_dev->lock_rwsem); } - if (!__rbd_is_lock_owner(rbd_dev)) - wake_requests(rbd_dev, false); + maybe_kick_acquire(rbd_dev); up_read(&rbd_dev->lock_rwsem); } @@ -3850,7 +3939,6 @@ static void cancel_tasks_sync(struct rbd_device *rbd_dev) static void rbd_unregister_watch(struct rbd_device *rbd_dev) { - WARN_ON(waitqueue_active(&rbd_dev->lock_waitq)); cancel_tasks_sync(rbd_dev); mutex_lock(&rbd_dev->watch_mutex); @@ -3893,6 +3981,7 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev) queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); } else { __rbd_lock(rbd_dev, cookie); + wake_lock_waiters(rbd_dev, 0); } } @@ -3913,15 +4002,18 @@ static void rbd_reregister_watch(struct work_struct *work) ret = __rbd_register_watch(rbd_dev); if (ret) { rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); - if (ret == -EBLACKLISTED || ret == -ENOENT) { - set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); - wake_requests(rbd_dev, true); - } else { + if (ret != -EBLACKLISTED && ret != -ENOENT) { queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, RBD_RETRY_DELAY); + mutex_unlock(&rbd_dev->watch_mutex); + return; } + mutex_unlock(&rbd_dev->watch_mutex); + down_write(&rbd_dev->lock_rwsem); + wake_lock_waiters(rbd_dev, ret); + up_write(&rbd_dev->lock_rwsem); return; } @@ -3996,54 +4088,6 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, return ret; } -/* - * lock_rwsem must be held for read - */ -static int rbd_wait_state_locked(struct rbd_device *rbd_dev, bool may_acquire) -{ - DEFINE_WAIT(wait); - unsigned long timeout; - int ret = 0; - - if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) - return -EBLACKLISTED; - - if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) - return 0; - - if (!may_acquire) { - rbd_warn(rbd_dev, "exclusive lock required"); - return -EROFS; - } - - do { - /* - * Note the use of mod_delayed_work() in rbd_acquire_lock() - * and cancel_delayed_work() in wake_requests(). - */ - dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev); - queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); - prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait, - TASK_UNINTERRUPTIBLE); - up_read(&rbd_dev->lock_rwsem); - timeout = schedule_timeout(ceph_timeout_jiffies( - rbd_dev->opts->lock_timeout)); - down_read(&rbd_dev->lock_rwsem); - if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { - ret = -EBLACKLISTED; - break; - } - if (!timeout) { - rbd_warn(rbd_dev, "timed out waiting for lock"); - ret = -ETIMEDOUT; - break; - } - } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); - - finish_wait(&rbd_dev->lock_waitq, &wait); - return ret; -} - static void rbd_queue_workfn(struct work_struct *work) { struct request *rq = blk_mq_rq_from_pdu(work); @@ -4054,7 +4098,6 @@ static void rbd_queue_workfn(struct work_struct *work) u64 length = blk_rq_bytes(rq); enum obj_operation_type op_type; u64 mapping_size; - bool must_be_locked; int result; switch (req_op(rq)) { @@ -4128,21 +4171,10 @@ static void rbd_queue_workfn(struct work_struct *work) goto err_rq; } - must_be_locked = - (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) && - (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read); - if (must_be_locked) { - down_read(&rbd_dev->lock_rwsem); - result = rbd_wait_state_locked(rbd_dev, - !rbd_dev->opts->exclusive); - if (result) - goto err_unlock; - } - img_request = rbd_img_request_create(rbd_dev, op_type, snapc); if (!img_request) { result = -ENOMEM; - goto err_unlock; + goto err_rq; } img_request->rq = rq; snapc = NULL; /* img_request consumes a ref */ @@ -4155,19 +4187,11 @@ static void rbd_queue_workfn(struct work_struct *work) if (result) goto err_img_request; - if (must_be_locked) { - rbd_lock_add_request(img_request); - up_read(&rbd_dev->lock_rwsem); - } - rbd_img_handle_request(img_request, 0); return; err_img_request: rbd_img_request_put(img_request); -err_unlock: - if (must_be_locked) - up_read(&rbd_dev->lock_rwsem); err_rq: if (result) rbd_warn(rbd_dev, "%s %llx at %llx result %d", @@ -4835,9 +4859,10 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock); INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work); spin_lock_init(&rbd_dev->lock_lists_lock); + INIT_LIST_HEAD(&rbd_dev->acquiring_list); INIT_LIST_HEAD(&rbd_dev->running_list); + init_completion(&rbd_dev->acquire_wait); init_completion(&rbd_dev->releasing_wait); - init_waitqueue_head(&rbd_dev->lock_waitq); rbd_dev->dev.bus = &rbd_bus_type; rbd_dev->dev.type = &rbd_device_type; @@ -5857,24 +5882,45 @@ static void rbd_dev_image_unlock(struct rbd_device *rbd_dev) up_write(&rbd_dev->lock_rwsem); } +/* + * If the wait is interrupted, an error is returned even if the lock + * was successfully acquired. rbd_dev_image_unlock() will release it + * if needed. + */ static int rbd_add_acquire_lock(struct rbd_device *rbd_dev) { - int ret; + long ret; if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) { + if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read) + return 0; + rbd_warn(rbd_dev, "exclusive-lock feature is not enabled"); return -EINVAL; } - /* FIXME: "rbd map --exclusive" should be in interruptible */ - down_read(&rbd_dev->lock_rwsem); - ret = rbd_wait_state_locked(rbd_dev, true); - up_read(&rbd_dev->lock_rwsem); + if (rbd_dev->spec->snap_id != CEPH_NOSNAP) + return 0; + + rbd_assert(!rbd_is_lock_owner(rbd_dev)); + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); + ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait, + ceph_timeout_jiffies(rbd_dev->opts->lock_timeout)); + if (ret > 0) + ret = rbd_dev->acquire_err; + else if (!ret) + ret = -ETIMEDOUT; + if (ret) { - rbd_warn(rbd_dev, "failed to acquire exclusive lock"); - return -EROFS; + rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret); + return ret; } + /* + * The lock may have been released by now, unless automatic lock + * transitions are disabled. + */ + rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev)); return 0; } @@ -6319,11 +6365,9 @@ static ssize_t do_rbd_add(struct bus_type *bus, if (rc) goto err_out_image_probe; - if (rbd_dev->opts->exclusive) { - rc = rbd_add_acquire_lock(rbd_dev); - if (rc) - goto err_out_device_setup; - } + rc = rbd_add_acquire_lock(rbd_dev); + if (rc) + goto err_out_image_lock; /* Everything's ready. Announce the disk to the world. */ @@ -6349,7 +6393,6 @@ static ssize_t do_rbd_add(struct bus_type *bus, err_out_image_lock: rbd_dev_image_unlock(rbd_dev); -err_out_device_setup: rbd_dev_device_release(rbd_dev); err_out_image_probe: rbd_dev_image_release(rbd_dev);