diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c index a0045ac880406306a12c19d8c8c4e3f4da7bd1a6..5529d392e5d37e042f0f37fade76ca84def2df28 100644 --- a/drivers/block/drbd/drbd_main.c +++ b/drivers/block/drbd/drbd_main.c @@ -2383,6 +2383,73 @@ void drbd_minor_destroy(struct kref *kref) kref_put(&tconn->kref, &conn_destroy); } +/* One global retry thread, if we need to push back some bio and have it + * reinserted through our make request function. + */ +static struct retry_worker { + struct workqueue_struct *wq; + struct work_struct worker; + + spinlock_t lock; + struct list_head writes; +} retry; + +static void do_retry(struct work_struct *ws) +{ + struct retry_worker *retry = container_of(ws, struct retry_worker, worker); + LIST_HEAD(writes); + struct drbd_request *req, *tmp; + + spin_lock_irq(&retry->lock); + list_splice_init(&retry->writes, &writes); + spin_unlock_irq(&retry->lock); + + list_for_each_entry_safe(req, tmp, &writes, tl_requests) { + struct drbd_conf *mdev = req->w.mdev; + struct bio *bio = req->master_bio; + unsigned long start_time = req->start_time; + + /* We have exclusive access to this request object. + * If it had not been RQ_POSTPONED, the code path which queued + * it here would have completed and freed it already. + */ + mempool_free(req, drbd_request_mempool); + + /* A single suspended or otherwise blocking device may stall + * all others as well. Fortunately, this code path is to + * recover from a situation that "should not happen": + * concurrent writes in multi-primary setup. + * In a "normal" lifecycle, this workqueue is supposed to be + * destroyed without ever doing anything. + * If it turns out to be an issue anyways, we can do per + * resource (replication group) or per device (minor) retry + * workqueues instead. + */ + + /* We are not just doing generic_make_request(), + * as we want to keep the start_time information. */ + do { + inc_ap_bio(mdev); + } while(__drbd_make_request(mdev, bio, start_time)); + } +} + +void drbd_restart_write(struct drbd_request *req) +{ + unsigned long flags; + spin_lock_irqsave(&retry.lock, flags); + list_move_tail(&req->tl_requests, &retry.writes); + spin_unlock_irqrestore(&retry.lock, flags); + + /* Drop the extra reference that would otherwise + * have been dropped by complete_master_bio. + * do_retry() needs to grab a new one. */ + dec_ap_bio(req->w.mdev); + + queue_work(retry.wq, &retry.worker); +} + + static void drbd_cleanup(void) { unsigned int i; @@ -2402,6 +2469,9 @@ static void drbd_cleanup(void) if (drbd_proc) remove_proc_entry("drbd", NULL); + if (retry.wq) + destroy_workqueue(retry.wq); + drbd_genl_unregister(); idr_for_each_entry(&minors, mdev, i) { @@ -2851,6 +2921,15 @@ int __init drbd_init(void) rwlock_init(&global_state_lock); INIT_LIST_HEAD(&drbd_tconns); + retry.wq = create_singlethread_workqueue("drbd-reissue"); + if (!retry.wq) { + printk(KERN_ERR "drbd: unable to create retry workqueue\n"); + goto fail; + } + INIT_WORK(&retry.worker, do_retry); + spin_lock_init(&retry.lock); + INIT_LIST_HEAD(&retry.writes); + printk(KERN_INFO "drbd: initialized. " "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n", API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX); diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c index 8a7f61ba74a89e08406f150de0393f0df116313c..b159ad15abe5f8a60fa5b311ccba730a276a95b9 100644 --- a/drivers/block/drbd/drbd_receiver.c +++ b/drivers/block/drbd/drbd_receiver.c @@ -1748,30 +1748,6 @@ static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi) return err; } -static int w_restart_write(struct drbd_work *w, int cancel) -{ - struct drbd_request *req = container_of(w, struct drbd_request, w); - struct drbd_conf *mdev = w->mdev; - struct bio *bio; - unsigned long start_time; - unsigned long flags; - - spin_lock_irqsave(&mdev->tconn->req_lock, flags); - if (!expect(req->rq_state & RQ_POSTPONED)) { - spin_unlock_irqrestore(&mdev->tconn->req_lock, flags); - return -EIO; - } - bio = req->master_bio; - start_time = req->start_time; - /* Postponed requests will not have their master_bio completed! */ - __req_mod(req, DISCARD_WRITE, NULL); - spin_unlock_irqrestore(&mdev->tconn->req_lock, flags); - - while (__drbd_make_request(mdev, bio, start_time)) - /* retry */ ; - return 0; -} - static void restart_conflicting_writes(struct drbd_conf *mdev, sector_t sector, int size) { @@ -1785,11 +1761,9 @@ static void restart_conflicting_writes(struct drbd_conf *mdev, if (req->rq_state & RQ_LOCAL_PENDING || !(req->rq_state & RQ_POSTPONED)) continue; - if (expect(list_empty(&req->w.list))) { - req->w.mdev = mdev; - req->w.cb = w_restart_write; - drbd_queue_work(&mdev->tconn->data.work, &req->w); - } + /* as it is RQ_POSTPONED, this will cause it to + * be queued on the retry workqueue. */ + __req_mod(req, DISCARD_WRITE, NULL); } } diff --git a/drivers/block/drbd/drbd_req.c b/drivers/block/drbd/drbd_req.c index c3f99bde0e1125d920bf3a6cee3e04092ad5265e..5f4436c3abb3d3388c393df2980adfe09e7338fd 100644 --- a/drivers/block/drbd/drbd_req.c +++ b/drivers/block/drbd/drbd_req.c @@ -104,7 +104,7 @@ static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const * and never sent), it should still be "empty" as * initialized in drbd_req_new(), so we can list_del() it * here unconditionally */ - list_del(&req->tl_requests); + list_del_init(&req->tl_requests); /* if it was a write, we may have to set the corresponding * bit(s) out-of-sync first. If it had a local part, we need to @@ -143,7 +143,10 @@ static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const } } - drbd_req_free(req); + if (s & RQ_POSTPONED) + drbd_restart_write(req); + else + drbd_req_free(req); } static void queue_barrier(struct drbd_conf *mdev) @@ -289,8 +292,16 @@ void _req_may_be_done(struct drbd_request *req, struct bio_and_error *m) if (!(s & RQ_POSTPONED)) { m->error = ok ? 0 : (error ?: -EIO); m->bio = req->master_bio; + req->master_bio = NULL; + } else { + /* Assert that this will be _req_is_done() + * with this very invokation. */ + /* FIXME: + * what about (RQ_LOCAL_PENDING | RQ_LOCAL_ABORTED)? + */ + D_ASSERT(!(s & RQ_LOCAL_PENDING)); + D_ASSERT(s & RQ_NET_DONE); } - req->master_bio = NULL; } if (s & RQ_LOCAL_PENDING) diff --git a/drivers/block/drbd/drbd_req.h b/drivers/block/drbd/drbd_req.h index 68f54050b7ca4459411f85f5820eec66e49dce29..492f81d3765a16ca3503ddb37ccf906b64932f20 100644 --- a/drivers/block/drbd/drbd_req.h +++ b/drivers/block/drbd/drbd_req.h @@ -268,6 +268,9 @@ extern void request_timer_fn(unsigned long data); extern void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what); extern void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what); +/* this is in drbd_main.c */ +extern void drbd_restart_write(struct drbd_request *req); + /* use this if you don't want to deal with calling complete_master_bio() * outside the spinlock, e.g. when walking some list on cleanup. */ static inline int _req_mod(struct drbd_request *req, enum drbd_req_event what)